# pylint: disable=line-too-long, broad-exception-caught, invalid-name, missing-function-docstring, too-many-instance-attributes, missing-class-docstring
# ruff: noqa: E501
import os # 导入os模块
import platform # 导入platform模块
import random # 导入random模块
import time # 导入time模块
from dataclasses import asdict, dataclass # 从dataclasses模块中导入asdict和dataclass
from pathlib import Path # 从pathlib模块中导入Path类
# from types import SimpleNamespace # 从types模块中导入SimpleNamespace类,但未使用
import gradio as gr #导入gradio模块并起别名gr
import psutil #导入psutil模块
import getpass #导入 getpass模块
from about_time import about_time # 从about_time模块中导入about_time函数
from ctransformers import AutoModelForCausalLM # 从ctransformers模块中导入AutoModelForCausalLM类
from dl_hf_model import dl_hf_model # 从dl_hf_model模块中导入dl_hf_model函数
from loguru import logger # 从loguru模块中导入logger
filename_list = [ # 定义文件名列表
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q2_K.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q3_K_L.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q3_K_M.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q3_K_S.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_0.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_1.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_K_M.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_K_S.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q5_0.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q5_1.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q5_K_M.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q5_K_S.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q6_K.bin",
"Wizard-Vicuna-7B-Uncensored.ggmlv3.q8_0.bin",
]
URL = "https://huggingface.co/TheBloke/Wizard-Vicuna-7B-Uncensored-GGML/raw/main/Wizard-Vicuna-7B-Uncensored.ggmlv3.q4_K_M.bin" # 4.05G
#url = "https://huggingface.co/savvamadar/ggml-gpt4all-j-v1.3-groovy/blob/main/ggml-gpt4all-j-v1.3-groovy.bin"
url = "https://huggingface.co/TheBloke/Llama-2-13B-GGML/blob/main/llama-2-13b.ggmlv3.q4_K_S.bin" # 7.37G
url = "https://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q3_K_L.bin"
url = "https://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q3_K_L.bin" # 6.93G
url = "https://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q3_K_L.binhttps://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q4_K_M.bin" # 7.87G
url = "https://huggingface.co/localmodels/Llama-2-13B-Chat-ggml/blob/main/llama-2-13b-chat.ggmlv3.q4_K_S.bin" # 7.37G
_ = ( # 定义一个判断是否在特定环境的标志
"golay" in platform.node()
or "okteto" in platform.node()
or Path("/kaggle").exists()
# or psutil.cpu_count(logical=False) < 4
or 1 # run 7b in hf
)
if _: # 如果在特定环境中
url = "https://huggingface.co/TheBloke/Llama-2-13B-chat-GGML/blob/main/llama-2-13b-chat.ggmlv3.q2_K.bin"
# url = "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGML/blob/main/llama-2-7b-chat.ggmlv3.q2_K.bin" # 2.87G
# url = "https://huggingface.co/TheBloke/Llama-2-7B-Chat-GGML/blob/main/llama-2-7b-chat.ggmlv3.q4_K_M.bin" # 2.87G
prompt_template = """[INST] <>
You are a cute kitten and I am your owner.
<>
{question} [/INST]
"""
_ = psutil.cpu_count(logical=False) - 1 # 获取CPU物理核心数减1
cpu_count: int = int(_) if _ else 1 # 如果上一步结果小于0则为1
logger.debug(f"{cpu_count=}") # 打印CPU核心数
LLM = None # 声明LLM变量
try:
model_loc, file_size = dl_hf_model(url) # 从url下载模型到本地
except Exception as exc_:
logger.error(exc_) # 打印错误
raise SystemExit(1) from exc_ # 如果下载失败则退出
LLM = AutoModelForCausalLM.from_pretrained( # 初始化LLM模型
model_loc,
model_type="llama",
# threads=cpu_count,
)
logger.info(f"done load llm {model_loc=} {file_size=}G") # 打印加载模型信息
os.environ["TZ"] = "Asia/Shanghai" # 设置时区为上海
try:
time.tzset() # type: ignore # pylint: disable=no-member # 尝试应用时区设置
except Exception:
# Windows
logger.warning("Windows, cant run time.tzset()") # windows不支持tzset打印提示
_ = """
ns = SimpleNamespace(
response="",
generator=(_ for _ in []),
)
# """
@dataclass # 定义数据类
class GenerationConfig:
temperature: float = 0.7
top_k: int = 50
top_p: float = 0.9
repetition_penalty: float = 1.0
max_new_tokens: int = 512
seed: int = 42
reset: bool = False
stream: bool = True
# threads: int = cpu_count
# stop: list[str] = field(default_factory=lambda: [stop_string])
def generate( # 定义生成函数
question: str,
llm=LLM,
config: GenerationConfig = GenerationConfig(),
):
"""Run model inference, will return a Generator if streaming is true."""
# _ = prompt_template.format(question=question)
# print(_)
prompt = prompt_template.format(question=question) # 填充prompt
return llm( # 调用LLM模型
prompt,
**asdict(config),
)
logger.debug(f"{asdict(GenerationConfig())=}") # 打印默认生成配置
def user(user_message, history): # 定义user函数处理用户输入
# return user_message, history + [[user_message, None]]
history.append([user_message, None]) # 在history中追加用户输入
return user_message, history # keep user_message
def user1(user_message, history): # 定义user1函数处理用户输入
# return user_message, history + [[user_message, None]]
history.append([user_message, None]) # 在history中追加用户输入
return "", history # clear user_message
def bot_(history): # 定义bot_函数生成回复
user_message = history[-1][0]
resp = random.choice(["How are you?", "I love you", "I'm very hungry"])
bot_message = user_message + ": " + resp
history[-1][1] = ""
for character in bot_message:
history[-1][1] += character
time.sleep(0.02)
yield history
history[-1][1] = resp
yield history
def bot(history): # 定义bot函数生成回复
user_message = history[-1][0]
response = []
logger.debug(f"{user_message=}")
with about_time() as atime: # type: ignore # 测量生成用时
flag = 1
prefix = ""
then = time.time()
logger.debug("about to generate")
config = GenerationConfig(reset=True) # 配置生成参数
for elm in generate(user_message, config=config): # 生成回复
if flag == 1:
logger.debug("in the loop")
prefix = f"({time.time() - then:.2f}s) "
flag = 0
print(prefix, end="", flush=True)
logger.debug(f"{prefix=}")
print(elm, end="", flush=True)
# logger.debug(f"{elm}")
response.append(elm)
history[-1][1] = prefix + "".join(response) # 拼接前缀和生成内容到回复中
yield history
_ = (
f"(time elapsed: {atime.duration_human}, " # type: ignore # 生成用时信息
f"{atime.duration/len(''.join(response)):.2f}s/char)" # type: ignore
)
history[-1][1] = "".join(response) + f"\n{_}" # 拼接生成内容和用时信息为最终回复
yield history
def predict_api(prompt): # 定义预测API函数
logger.debug(f"{prompt=}")
try:
# user_prompt = prompt
config = GenerationConfig( # 配置生成参数
temperature=0.7,
top_k=10,
top_p=0.9,
repetition_penalty=1.0,
max_new_tokens=512, # adjust as needed
seed=42,
reset=True, # reset history (cache)
stream=False,
# threads=cpu_count,
# stop=prompt_prefix[1:2],
)
response = generate( # 生成回复
prompt,
config=config,
)
logger.debug(f"api: {response=}")
except Exception as exc:
logger.error(exc)
response = f"{exc=}"
# bot = {"inputs": [response]}
# bot = [(prompt, response)]
return response
css = """ # 定义css样式
.importantButton {
background: linear-gradient(45deg, #7e0570,#5d1c99, #6e00ff) !important;
border: none !important;
}
.importantButton:hover {
background: linear-gradient(45deg, #ff00e0,#8500ff, #6e00ff) !important;
border: none !important;
}
.disclaimer {font-variant-caps: all-small-caps; font-size: xx-small;}
.xsmall {font-size: x-small;}
"""
etext = """In America, where cars are an important part of the national psyche, a decade ago people had suddenly started to drive less, which had not happened since the oil shocks of the 1970s. """
examples_list = [ # 定义示例输入列表
["Hi, what are you doing?"],
[
"Hello."
]
]
logger.info("start block")
with gr.Blocks( # 使用gradio构建界面
title=f"{Path(model_loc).name}",
theme=gr.themes.Soft(text_size="sm", spacing_size="sm"),
css=css,
) as block:
# buff_var = gr.State("")
with gr.Accordion("🎈 Info", open=False): # 折叠面板显示模型信息
# gr.HTML(
# """
and spin a CPU UPGRADE to avoid the queue"""
# )
gr.Markdown(
f"""{Path(model_loc).name}
超级小猫使用LLaMA-2-13b-chat,调用16G的CPU运行,速度比较慢,请见谅。模型数据主要为英文,建议使用英文进行问答""",
elem_classes="xsmall",
)
# chatbot = gr.Chatbot().style(height=700) # 500
chatbot = gr.Chatbot(height=500) # 聊天界面
# buff = gr.Textbox(show_label=False, visible=True)
with gr.Row(): # 输入区域
with gr.Column(scale=5):
msg = gr.Textbox(
label="Chat Message Box",
placeholder="Ask me anything (press Shift+Enter or click Submit to send)",
show_label=False,
# container=False,
lines=6,
max_lines=30,
show_copy_button=True,
# ).style(container=False)
)
with gr.Column(scale=1, min_width=50):
with gr.Row():
submit = gr.Button("发送", elem_classes="xsmall") # 提交按钮
stop = gr.Button("停止", visible=True) # 停止按钮
clear = gr.Button("清除历史会话", visible=True) # 清空历史按钮
with gr.Accordion("Example Inputs", open=True): # 示例输入面板
examples = gr.Examples(
examples=examples_list,
inputs=[msg],
examples_per_page=40,
)
# with gr.Row():
with gr.Accordion("Disclaimer", open=False): # 免责声明面板
_ = Path(model_loc).name
gr.Markdown(
"免责声明:超级小猫(POWERED BY LLAMA 2) 可能会产生与事实不符的输出,不应依赖它来产生 "
"事实准确的信息。超级小猫(POWERED BY LLAMA 2) 是在各种公共数据集上进行训练的;虽然已尽 "
"已尽力清理预训练数据,但该模型仍有可能产生不良内容,"
"有偏见或其他冒犯性的输出",
elem_classes=["disclaimer"],
)
msg_submit_event = msg.submit( # 提交事件绑定user函数和bot函数
# fn=conversation.user_turn,
fn=user,
inputs=[msg, chatbot],
outputs=[msg, chatbot],
queue=True,
show_progress="full",
# api_name=None,
).then(bot, chatbot, chatbot, queue=True)
submit_click_event = submit.click( # 点击提交按钮事件,绑定user1函数清空输入和bot函数
# fn=lambda x, y: ("",) + user(x, y)[1:], # clear msg
fn=user1, # clear msg
inputs=[msg, chatbot],
outputs=[msg, chatbot],
queue=True,
# queue=False,
show_progress="full",
# api_name=None,
).then(bot, chatbot, chatbot, queue=True)
stop.click( # 点击停止按钮清空队列
fn=None,
inputs=None,
outputs=None,
cancels=[msg_submit_event, submit_click_event],
queue=False,
)
clear.click(lambda: None, None, chatbot, queue=False) # 点击清空历史按钮
with gr.Accordion("For Chat/Translation API", open=False, visible=False): # API调用面板
input_text = gr.Text()
api_btn = gr.Button("Go", variant="primary")
out_text = gr.Text()
api_btn.click( # 绑定API调用逻辑
predict_api,
input_text,
out_text,
api_name="api",
)
# block.load(update_buff, [], buff, every=1)
# block.load(update_buff, [buff_var], [buff_var, buff], every=1)
# concurrency_count=5, max_size=20
# max_size=36, concurrency_count=14
# CPU cpu_count=2 16G, model 7G
# CPU UPGRADE cpu_count=8 32G, model 7G
# does not work
_ = """
# _ = int(psutil.virtual_memory().total / 10**9 // file_size - 1)
# concurrency_count = max(_, 1)
if psutil.cpu_count(logical=False) >= 8:
# concurrency_count = max(int(32 / file_size) - 1, 1)
else:
# concurrency_count = max(int(16 / file_size) - 1, 1)
# """
concurrency_count = 1 # 并发数设置为1
logger.info(f"{concurrency_count=}")
block.queue(concurrency_count=concurrency_count, max_size=5).launch(debug=True) # 启动服务器