import os
from typing import Dict, Iterator, List, Optional
import openai
from qwen_agent.llm.base import BaseChatModel
import re
import copy
import json
import time
from contextlib import asynccontextmanager
from typing import Dict, List, Literal, Optional, Union
import torch
from pydantic import BaseModel, Field
from sse_starlette.sse import EventSourceResponse
from transformers import AutoTokenizer, AutoModelForCausalLM
from transformers.generation import GenerationConfig
def _gc(forced: bool = False, disable_gc: bool = True):
if disable_gc and not forced:
return
import gc
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
class ChatMessage(BaseModel):
role: Literal["user", "assistant", "system", "function"]
content: Optional[str]
function_call: Optional[Dict] = None
class DeltaMessage(BaseModel):
role: Optional[Literal["user", "assistant", "system"]] = None
content: Optional[str] = None
class ChatCompletionRequest(BaseModel):
model: str
messages: List[ChatMessage]
functions: Optional[List[Dict]] = None
temperature: Optional[float] = None
top_p: Optional[float] = None
max_length: Optional[int] = None
stream: Optional[bool] = False
stop: Optional[List[str]] = None
class ChatCompletionResponseChoice(BaseModel):
index: int
message: ChatMessage
finish_reason: Literal["stop", "length", "function_call"]
class ChatCompletionResponseStreamChoice(BaseModel):
index: int
delta: DeltaMessage
finish_reason: Optional[Literal["stop", "length"]]
class ChatCompletionResponse(BaseModel):
model: str
object: Literal["chat.completion", "chat.completion.chunk"]
choices: List[
Union[ChatCompletionResponseChoice, ChatCompletionResponseStreamChoice]
]
created: Optional[int] = Field(default_factory=lambda: int(time.time()))
# To work around that unpleasant leading-\n tokenization issue!
def add_extra_stop_words(stop_words):
if stop_words:
_stop_words = []
_stop_words.extend(stop_words)
for x in stop_words:
s = x.lstrip("\n")
if s and (s not in _stop_words):
_stop_words.append(s)
return _stop_words
return stop_words
def trim_stop_words(response, stop_words):
if stop_words:
for stop in stop_words:
idx = response.find(stop)
if idx != -1:
response = response[:idx]
return response
TOOL_DESC = """{name_for_model}: Call this tool to interact with the {name_for_human} API. What is the {name_for_human} API useful for? {description_for_model} Parameters: {parameters}"""
REACT_INSTRUCTION = """Answer the following questions as best you can. You have access to the following APIs:
{tools_text}
Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tools_name_text}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can be repeated zero or more times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question
Begin!"""
_TEXT_COMPLETION_CMD = object()
#
# Temporarily, the system role does not work as expected.
# We advise that you write the setups for role-play in your query,
# i.e., use the user role instead of the system role.
#
# TODO: Use real system role when the model is ready.
#
def parse_messages(messages, functions):
if all(m.role != "user" for m in messages):
raise Exception( f"Invalid request: Expecting at least one user message.",)
messages = copy.deepcopy(messages)
default_system = "You are a helpful assistant."
system = ""
if messages[0].role == "system":
system = messages.pop(0).content.lstrip("\n").rstrip()
if system == default_system:
system = ""
if functions:
tools_text = []
tools_name_text = []
for func_info in functions:
name = func_info.get("name", "")
name_m = func_info.get("name_for_model", name)
name_h = func_info.get("name_for_human", name)
desc = func_info.get("description", "")
desc_m = func_info.get("description_for_model", desc)
tool = TOOL_DESC.format(
name_for_model=name_m,
name_for_human=name_h,
# Hint: You can add the following format requirements in description:
# "Format the arguments as a JSON object."
# "Enclose the code within triple backticks (`) at the beginning and end of the code."
description_for_model=desc_m,
parameters=json.dumps(func_info["parameters"], ensure_ascii=False),
)
tools_text.append(tool)
tools_name_text.append(name_m)
tools_text = "\n\n".join(tools_text)
tools_name_text = ", ".join(tools_name_text)
system += "\n\n" + REACT_INSTRUCTION.format(
tools_text=tools_text,
tools_name_text=tools_name_text,
)
system = system.lstrip("\n").rstrip()
dummy_thought = {
"en": "\nThought: I now know the final answer.\nFinal answer: ",
"zh": "\nThought: 我会作答了。\nFinal answer: ",
}
_messages = messages
messages = []
for m_idx, m in enumerate(_messages):
role, content, func_call = m.role, m.content, m.function_call
if content:
content = content.lstrip("\n").rstrip()
if role == "function":
if (len(messages) == 0) or (messages[-1].role != "assistant"):
raise Exception("Invalid request: Expecting role assistant before role function.")
messages[-1].content += f"\nObservation: {content}"
if m_idx == len(_messages) - 1:
messages[-1].content += "\nThought:"
elif role == "assistant":
if len(messages) == 0:
raise Exception(f"Invalid request: Expecting role user before role assistant.")
last_msg = messages[-1].content
last_msg_has_zh = len(re.findall(r"[\u4e00-\u9fff]+", last_msg)) > 0
if func_call is None:
if functions:
content = dummy_thought["zh" if last_msg_has_zh else "en"] + content
else:
f_name, f_args = func_call["name"], func_call["arguments"]
if not content:
if last_msg_has_zh:
content = f"Thought: 我可以使用 {f_name} API。"
else:
content = f"Thought: I can use {f_name}."
content = f"\n{content}\nAction: {f_name}\nAction Input: {f_args}"
if messages[-1].role == "user":
messages.append(
ChatMessage(role="assistant", content=content.lstrip("\n").rstrip())
)
else:
messages[-1].content += content
elif role == "user":
messages.append(
ChatMessage(role="user", content=content.lstrip("\n").rstrip())
)
else:
raise Exception(
f"Invalid request: Incorrect role {role}."
)
query = _TEXT_COMPLETION_CMD
if messages[-1].role == "user":
query = messages[-1].content
messages = messages[:-1]
if len(messages) % 2 != 0:
raise Exception("Invalid request")
history = [] # [(Q1, A1), (Q2, A2), ..., (Q_last_turn, A_last_turn)]
for i in range(0, len(messages), 2):
if messages[i].role == "user" and messages[i + 1].role == "assistant":
usr_msg = messages[i].content.lstrip("\n").rstrip()
bot_msg = messages[i + 1].content.lstrip("\n").rstrip()
if system and (i == len(messages) - 2):
usr_msg = f"{system}\n\nQuestion: {usr_msg}"
system = ""
for t in dummy_thought.values():
t = t.lstrip("\n")
if bot_msg.startswith(t) and ("\nAction: " in bot_msg):
bot_msg = bot_msg[len(t):]
history.append([usr_msg, bot_msg])
else:
raise Exception("Invalid request: Expecting exactly one user (or function) role before every assistant role.")
if system:
assert query is not _TEXT_COMPLETION_CMD
query = f"{system}\n\nQuestion: {query}"
return query, history
def parse_response(response):
func_name, func_args = "", ""
i = response.rfind("\nAction:")
j = response.rfind("\nAction Input:")
k = response.rfind("\nObservation:")
if 0 <= i < j: # If the text has `Action` and `Action input`,
if k < j: # but does not contain `Observation`,
# then it is likely that `Observation` is omitted by the LLM,
# because the output text may have discarded the stop word.
response = response.rstrip() + "\nObservation:" # Add it back.
k = response.rfind("\nObservation:")
func_name = response[i + len("\nAction:"): j].strip()
func_args = response[j + len("\nAction Input:"): k].strip()
if func_name:
choice_data = ChatCompletionResponseChoice(
index=0,
message=ChatMessage(
role="assistant",
content=response[:i],
function_call={"name": func_name, "arguments": func_args},
),
finish_reason="function_call",
)
return choice_data
z = response.rfind("\nFinal Answer: ")
if z >= 0:
response = response[z + len("\nFinal Answer: "):]
choice_data = ChatCompletionResponseChoice(
index=0,
message=ChatMessage(role="assistant", content=response),
finish_reason="stop",
)
return choice_data
# completion mode, not chat mode
def text_complete_last_message(history, stop_words_ids, gen_kwargs):
im_start = "<|im_start|>"
im_end = "<|im_end|>"
prompt = f"{im_start}system\nYou are a helpful assistant.{im_end}"
for i, (query, response) in enumerate(history):
query = query.lstrip("\n").rstrip()
response = response.lstrip("\n").rstrip()
prompt += f"\n{im_start}user\n{query}{im_end}"
prompt += f"\n{im_start}assistant\n{response}{im_end}"
prompt = prompt[: -len(im_end)]
_stop_words_ids = [tokenizer.encode(im_end)]
if stop_words_ids:
for s in stop_words_ids:
_stop_words_ids.append(s)
stop_words_ids = _stop_words_ids
input_ids = torch.tensor([tokenizer.encode(prompt)]).to(model.device)
output = model.generate(input_ids, stop_words_ids=stop_words_ids, **gen_kwargs).tolist()[0]
output = tokenizer.decode(output, errors="ignore")
assert output.startswith(prompt)
output = output[len(prompt):]
output = trim_stop_words(output, ["<|endoftext|>", im_end])
print(f"\n{prompt}\n\n{output}\n")
return output
def create_chat_completion(request: ChatCompletionRequest):
global model, tokenizer
gen_kwargs = {}
if request.temperature is not None:
if request.temperature < 0.01:
gen_kwargs['top_k'] = 1 # greedy decoding
else:
# Not recommended. Please tune top_p instead.
gen_kwargs['temperature'] = request.temperature
if request.top_p is not None:
gen_kwargs['top_p'] = request.top_p
stop_words = add_extra_stop_words(request.stop)
if request.functions:
stop_words = stop_words or []
if "Observation:" not in stop_words:
stop_words.append("Observation:")
query, history = parse_messages(request.messages, request.functions)
if request.stream:
if request.functions:
raise Exception("Invalid request: Function calling is not yet implemented for stream mode.")
generate = predict(query, history, request.model, stop_words, gen_kwargs)
return generate
# return EventSourceResponse(generate, media_type="text/event-stream")
stop_words_ids = [tokenizer.encode(s) for s in stop_words] if stop_words else None
if query is _TEXT_COMPLETION_CMD:
response = text_complete_last_message(history, stop_words_ids=stop_words_ids, gen_kwargs=gen_kwargs)
else:
response, _ = model.chat(
tokenizer,
query,
history=history,
stop_words_ids=stop_words_ids,
**gen_kwargs
)
print(f"\n{history}\n{query}\n\n{response}\n")
_gc()
response = trim_stop_words(response, stop_words)
if request.functions:
choice_data = parse_response(response)
else:
choice_data = ChatCompletionResponseChoice(
index=0,
message=ChatMessage(role="assistant", content=response),
finish_reason="stop",
)
return ChatCompletionResponse(
model=request.model, choices=[choice_data], object="chat.completion"
)
def _dump_json(data: BaseModel, *args, **kwargs) -> str:
try:
return data.model_dump_json(*args, **kwargs)
except AttributeError: # pydantic<2.0.0
return data.json(*args, **kwargs) # noqa
def predict(
query: str, history: List[List[str]], model_id: str, stop_words: List[str], gen_kwargs: Dict,
):
global model, tokenizer
choice_data = ChatCompletionResponseStreamChoice(
index=0, delta=DeltaMessage(role="assistant"), finish_reason=None
)
chunk = ChatCompletionResponse(
model=model_id, choices=[choice_data], object="chat.completion.chunk"
)
yield "{}".format(_dump_json(chunk, exclude_unset=True))
current_length = 0
stop_words_ids = [tokenizer.encode(s) for s in stop_words] if stop_words else None
if stop_words:
# TODO: It's a little bit tricky to trim stop words in the stream mode.
raise Exception(
status_code=400,
detail="Invalid request: custom stop words are not yet supported for stream mode.",
)
response_generator = model.chat_stream(
tokenizer, query, history=history, stop_words_ids=stop_words_ids, **gen_kwargs
)
for new_response in response_generator:
if len(new_response) == current_length:
continue
new_text = new_response[current_length:]
current_length = len(new_response)
choice_data = ChatCompletionResponseStreamChoice(
index=0, delta=DeltaMessage(content=new_text), finish_reason=None
)
chunk = ChatCompletionResponse(
model=model_id, choices=[choice_data], object="chat.completion.chunk"
)
yield "{}".format(_dump_json(chunk, exclude_unset=True))
choice_data = ChatCompletionResponseStreamChoice(
index=0, delta=DeltaMessage(), finish_reason="stop"
)
chunk = ChatCompletionResponse(
model=model_id, choices=[choice_data], object="chat.completion.chunk"
)
yield "{}".format(_dump_json(chunk, exclude_unset=True))
yield "[DONE]"
_gc()
class QwenChatAsOAI(BaseChatModel):
def __init__(self, model: str, api_key: str, model_server: str):
self.checkpoint_path = copy.copy(model)
super().__init__()
tokenizer = AutoTokenizer.from_pretrained(
self.checkpoint_path,
trust_remote_code=True,
resume_download=True,
)
device_map = "cpu"
# device_map = "auto"
model = AutoModelForCausalLM.from_pretrained(
self.checkpoint_path,
device_map=device_map,
trust_remote_code=True,
resume_download=True,
).eval()
model.generation_config = GenerationConfig.from_pretrained(
self.checkpoint_path,
trust_remote_code=True,
resume_download=True,
)
self.model = model
def _chat_stream(
self,
messages: List[Dict],
stop: Optional[List[str]] = None,
) -> Iterator[str]:
_request = ChatCompletionRequest(model=self.checkpoint_path,
messages=messages,
stop=stop,
stream=True)
response = create_chat_completion(_request)
# TODO: error handling
for chunk in response:
if hasattr(chunk.choices[0].delta, 'content'):
yield chunk.choices[0].delta.content
def _chat_no_stream(
self,
messages: List[Dict],
stop: Optional[List[str]] = None,
) -> str:
_request = ChatCompletionRequest(model=self.checkpoint_path,
messages=messages,
stop=stop,
stream=False)
response = create_chat_completion(_request)
# TODO: error handling
return response.choices[0].message.content
def chat_with_functions(self,
messages: List[Dict],
functions: Optional[List[Dict]] = None) -> Dict:
if functions:
_request = ChatCompletionRequest(model=self.checkpoint_path,
messages=messages,
functions=functions)
response = create_chat_completion(_request)
else:
_request = ChatCompletionRequest(model=self.checkpoint_path,
messages=messages)
response = create_chat_completion(_request)
# TODO: error handling
return response.choices[0].message.dict()
class QwenChatAsOAI1(BaseChatModel):
def __init__(self, model: str, api_key: str, model_server: str):
super().__init__()
if model_server.strip().lower() != 'openai':
openai.api_base = model_server
openai.api_key = api_key.strip() or os.getenv('OPENAI_API_KEY',
default='EMPTY')
self.model = model
def _chat_stream(
self,
messages: List[Dict],
stop: Optional[List[str]] = None,
) -> Iterator[str]:
response = openai.ChatCompletion.create(model=self.model,
messages=messages,
stop=stop,
stream=True)
# TODO: error handling
for chunk in response:
if hasattr(chunk.choices[0].delta, 'content'):
yield chunk.choices[0].delta.content
def _chat_no_stream(
self,
messages: List[Dict],
stop: Optional[List[str]] = None,
) -> str:
response = openai.ChatCompletion.create(model=self.model,
messages=messages,
stop=stop,
stream=False)
# TODO: error handling
return response.choices[0].message.content
def chat_with_functions(self,
messages: List[Dict],
functions: Optional[List[Dict]] = None) -> Dict:
if functions:
response = openai.ChatCompletion.create(model=self.model,
messages=messages,
functions=functions)
else:
response = openai.ChatCompletion.create(model=self.model,
messages=messages)
# TODO: error handling
return response.choices[0].message