HFLLMAPI / networks /openai_streamer.py
Shuddho's picture
Update networks/openai_streamer.py
8c0afa8 verified
import copy
import json
import re
import tiktoken
import uuid
from curl_cffi import requests
from tclogger import logger
from constants.envs import PROXIES
from constants.headers import OPENAI_GET_HEADERS, OPENAI_POST_DATA
from constants.models import TOKEN_LIMIT_MAP, TOKEN_RESERVED
from messagers.message_outputer import OpenaiStreamOutputer
from networks.proof_worker import ProofWorker
class OpenaiRequester:
def __init__(self):
self.init_requests_params()
def init_requests_params(self):
self.api_base = "https://chat.openai.com/backend-anon"
self.api_me = f"{self.api_base}/me"
self.api_models = f"{self.api_base}/models"
self.api_chat_requirements = f"{self.api_base}/sentinel/chat-requirements"
self.api_conversation = f"{self.api_base}/conversation"
self.uuid = str(uuid.uuid4())
self.requests_headers = copy.deepcopy(OPENAI_GET_HEADERS)
extra_headers = {
"Oai-Device-Id": self.uuid,
}
self.requests_headers.update(extra_headers)
def log_request(self, url, method="GET"):
logger.note(f"> {method}:", end=" ")
logger.mesg(f"{url}", end=" ")
def log_response(
self, res: requests.Response, stream=False, iter_lines=False, verbose=False
):
status_code = res.status_code
status_code_str = f"[{status_code}]"
if status_code == 200:
logger_func = logger.success
else:
logger_func = logger.warn
logger_func(status_code_str)
logger.enter_quiet(not verbose)
if stream:
if not iter_lines:
return
if not hasattr(self, "content_offset"):
self.content_offset = 0
for line in res.iter_lines():
line = line.decode("utf-8")
line = re.sub(r"^data:\s*", "", line)
if re.match(r"^\[DONE\]", line):
logger.success("\n[Finished]")
break
line = line.strip()
print(line)
if line:
try:
data = json.loads(line, strict=False)
message_role = data["message"]["author"]["role"]
message_status = data["message"]["status"]
if (
message_role == "assistant"
and message_status == "in_progress"
):
content = data["message"]["content"]["parts"][0]
delta_content = content[self.content_offset :]
self.content_offset = len(content)
logger_func(delta_content, end="")
except Exception as e:
logger.warn(e)
else:
logger_func(res.json())
logger.exit_quiet(not verbose)
def get_models(self):
self.log_request(self.api_models)
res = requests.get(
self.api_models,
headers=self.requests_headers,
timeout=10,
)
self.log_response(res)
def auth(self):
self.log_request(self.api_chat_requirements, method="POST")
res = requests.post(
self.api_chat_requirements,
headers=self.requests_headers,
timeout=10,
)
data = res.json()
self.chat_requirements_token = data["token"]
self.chat_requirements_seed = data["proofofwork"]["seed"]
self.chat_requirements_difficulty = data["proofofwork"]["difficulty"]
self.log_response(res)
def transform_messages(self, messages: list[dict]):
def get_role(role):
if role in ["system", "user", "assistant"]:
return role
else:
return "system"
new_messages = [
{
"author": {"role": get_role(message["role"])},
"content": {"content_type": "text", "parts": [message["content"]]},
"metadata": {},
}
for message in messages
]
return new_messages
def chat_completions(self, messages: list[dict], iter_lines=False, verbose=False):
proof_token = ProofWorker().calc_proof_token(
self.chat_requirements_seed, self.chat_requirements_difficulty
)
extra_headers = {
"Accept": "text/event-stream",
"Openai-Sentinel-Chat-Requirements-Token": self.chat_requirements_token,
"Openai-Sentinel-Proof-Token": proof_token,
}
requests_headers = copy.deepcopy(self.requests_headers)
requests_headers.update(extra_headers)
post_data = copy.deepcopy(OPENAI_POST_DATA)
extra_data = {
"messages": self.transform_messages(messages),
"websocket_request_id": str(uuid.uuid4()),
}
post_data.update(extra_data)
self.log_request(self.api_conversation, method="POST")
s = requests.Session()
res = s.post(
self.api_conversation,
headers=requests_headers,
json=post_data,
timeout=10,
stream=True,
)
self.log_response(res, stream=True, iter_lines=iter_lines, verbose=verbose)
return res
class OpenaiStreamer:
def __init__(self):
self.model = "gpt-3.5-turbo"
self.message_outputer = OpenaiStreamOutputer(
owned_by="openai", model="gpt-3.5-turbo"
)
self.tokenizer = tiktoken.get_encoding("cl100k_base")
def count_tokens(self, messages: list[dict]):
token_count = sum(
len(self.tokenizer.encode(message["content"])) for message in messages
)
logger.note(f"Prompt Token Count: {token_count}")
return token_count
def check_token_limit(self, messages: list[dict]):
token_limit = TOKEN_LIMIT_MAP[self.model]
token_count = self.count_tokens(messages)
token_redundancy = int(token_limit - TOKEN_RESERVED - token_count)
if token_redundancy <= 0:
raise ValueError(
f"Prompt exceeded token limit: {token_count} > {token_limit}"
)
return True
def chat_response(self, messages: list[dict], iter_lines=False, verbose=False):
self.check_token_limit(messages)
logger.enter_quiet(not verbose)
requester = OpenaiRequester()
requester.auth()
logger.exit_quiet(not verbose)
return requester.chat_completions(
messages=messages, iter_lines=iter_lines, verbose=verbose
)
def chat_return_generator(self, stream_response: requests.Response, verbose=False):
content_offset = 0
is_finished = False
for line in stream_response.iter_lines():
line = line.decode("utf-8")
line = re.sub(r"^data:\s*", "", line)
line = line.strip()
print(line)
if not line:
continue
if re.match(r"^\[DONE\]", line):
content_type = "Finished"
delta_content = ""
logger.success("\n[Finished]")
is_finished = True
else:
print(line)
content_type = "Completions"
delta_content = ""
try:
data = json.loads(line, strict=False)
message_role = data["message"]["author"]["role"]
message_status = data["message"]["status"]
if message_role == "assistant" and message_status == "in_progress":
content = data["message"]["content"]["parts"][0]
if not len(content):
continue
delta_content = content[content_offset:]
content_offset = len(content)
if verbose:
logger.success(delta_content, end="")
else:
continue
except Exception as e:
logger.warn(e)
output = self.message_outputer.output(
content=delta_content, content_type=content_type
)
yield output
if not is_finished:
yield self.message_outputer.output(content="", content_type="Finished")
def chat_return_dict(self, stream_response: requests.Response):
final_output = self.message_outputer.default_data.copy()
final_output["choices"] = [
{
"index": 0,
"finish_reason": "stop",
"message": {"role": "assistant", "content": ""},
}
]
final_content = ""
for item in self.chat_return_generator(stream_response):
print(item)
try:
data = json.loads(item)
delta = data["choices"][0]["delta"]
delta_content = delta.get("content", "")
if delta_content:
final_content += delta_content
except Exception as e:
logger.warn(e)
final_output["choices"][0]["message"]["content"] = final_content.strip()
return final_output
if __name__ == "__main__":
streamer = OpenaiStreamer()
messages = [
{
"role": "system",
"content": "You are an LLM developed by NiansuhAI.\nYour name is Niansuh-Copilot.",
},
{"role": "user", "content": "Hello, what is your role?"},
{"role": "assistant", "content": "I am an LLM."},
{"role": "user", "content": "What is your name?"},
]
streamer.chat_response(messages=messages, iter_lines=True, verbose=True)
# python -m networks.openai_streamer