|
import gradio as gr |
|
from gradio_client import Client |
|
from huggingface_hub import InferenceClient |
|
import random |
|
from datetime import datetime |
|
from models import models |
|
ss_client = Client("https://omnibus-html-image-current-tab.hf.space/") |
|
''' |
|
models=[ |
|
"google/gemma-7b", |
|
"google/gemma-7b-it", |
|
"google/gemma-2b", |
|
"google/gemma-2b-it", |
|
"openchat/openchat-3.5-0106", |
|
"NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", |
|
"mistralai/Mixtral-8x7B-Instruct-v0.1", |
|
"JunRyeol/jr_model", |
|
] |
|
''' |
|
|
|
|
|
def test_models(): |
|
log_box=[] |
|
for model in models: |
|
start_time = datetime.now() |
|
try: |
|
|
|
generate_kwargs = dict( |
|
temperature=0.9, |
|
max_new_tokens=128, |
|
top_p=0.9, |
|
repetition_penalty=1.0, |
|
do_sample=True, |
|
seed=111111111, |
|
) |
|
|
|
print(f'trying: {model}\n') |
|
client= InferenceClient(model) |
|
outp="" |
|
stream=client.text_generation("What is a cat", **generate_kwargs, stream=True, details=True, return_full_text=True) |
|
for response in stream: |
|
outp += response.token.text |
|
print (outp) |
|
time_delta = datetime.now() - start_time |
|
count=time_delta.total_seconds() |
|
|
|
log = {"Model":model,"Status":"Success","Output":outp, "Time":count} |
|
print(f'{log}\n') |
|
log_box.append(log) |
|
except Exception as e: |
|
time_delta = datetime.now() - start_time |
|
count=time_delta.total_seconds() |
|
|
|
log = {"Model":model,"Status":"Error","Output":e,"Time":count} |
|
print(f'{log}\n') |
|
log_box.append(log) |
|
yield log_box |
|
|
|
def format_prompt_default(message, history,cust_p): |
|
prompt = "" |
|
if history: |
|
|
|
for user_prompt, bot_response in history: |
|
prompt += f"{user_prompt}\n" |
|
print(prompt) |
|
prompt += f"{bot_response}\n" |
|
print(prompt) |
|
|
|
prompt+=cust_p.replace("USER_INPUT",message) |
|
return prompt |
|
|
|
def format_prompt_gemma(message, history,cust_p): |
|
prompt = "" |
|
if history: |
|
for user_prompt, bot_response in history: |
|
prompt += f"<start_of_turn>user{user_prompt}<end_of_turn>" |
|
prompt += f"<start_of_turn>model{bot_response}<end_of_turn>" |
|
if VERBOSE==True: |
|
print(prompt) |
|
|
|
prompt+=cust_p.replace("USER_INPUT",message) |
|
return prompt |
|
def format_prompt_openc(message, history,cust_p): |
|
|
|
prompt="" |
|
if history: |
|
|
|
for user_prompt, bot_response in history: |
|
prompt += f"{user_prompt}" |
|
prompt += f"<|end_of_turn|>" |
|
prompt += f"GPT4 Correct Assistant: " |
|
prompt += f"{bot_response}" |
|
prompt += f"<|end_of_turn|>" |
|
print(prompt) |
|
|
|
prompt+=cust_p.replace("USER_INPUT",message) |
|
return prompt |
|
|
|
def format_prompt_mixtral(message, history,cust_p): |
|
prompt = "<s>" |
|
if history: |
|
for user_prompt, bot_response in history: |
|
prompt += f"[INST] {user_prompt} [/INST]" |
|
prompt += f" {bot_response}</s> " |
|
|
|
prompt+=cust_p.replace("USER_INPUT",message) |
|
return prompt |
|
|
|
def format_prompt_choose(message, history, cust_p, model_name): |
|
if "gemma" in models[model_name].lower(): |
|
return format_prompt_gemma(message,history,cust_p) |
|
if "mixtral" in models[model_name].lower(): |
|
return format_prompt_mixtral(message,history,cust_p) |
|
if "openchat" in models[model_name].lower(): |
|
return format_prompt_openc(message,history,cust_p) |
|
else: |
|
return format_prompt_default(message,history,cust_p) |
|
|
|
def load_models(inp): |
|
print(type(inp)) |
|
print(inp) |
|
print(models[inp]) |
|
model_state= InferenceClient(models[inp]) |
|
out_box=gr.update(label=models[inp]) |
|
if "gemma" in models[inp].lower(): |
|
prompt_out="<start_of_turn>userUSER_INPUT<end_of_turn><start_of_turn>model" |
|
return out_box,prompt_out, model_state |
|
if "mixtral" in models[inp].lower(): |
|
prompt_out="[INST] USER_INPUT [/INST]" |
|
return out_box,prompt_out, model_state |
|
if "openchat" in models[inp].lower(): |
|
prompt_out="GPT4 Correct User: USER_INPUT<|end_of_turn|>GPT4 Correct Assistant: " |
|
return out_box,prompt_out, model_state |
|
else: |
|
prompt_out="USER_INPUT\n" |
|
return out_box,prompt_out, model_state |
|
|
|
|
|
VERBOSE=False |
|
|
|
def load_models_OG(inp): |
|
if VERBOSE==True: |
|
print(type(inp)) |
|
print(inp) |
|
print(models[inp]) |
|
|
|
|
|
return gr.update(label=models[inp]) |
|
|
|
def format_prompt(message, history, cust_p): |
|
prompt = "" |
|
if history: |
|
for user_prompt, bot_response in history: |
|
prompt += f"<start_of_turn>user{user_prompt}<end_of_turn>" |
|
prompt += f"<start_of_turn>model{bot_response}<end_of_turn>" |
|
if VERBOSE==True: |
|
print(prompt) |
|
|
|
prompt+=cust_p.replace("USER_INPUT",message) |
|
return prompt |
|
|
|
def chat_inf(system_prompt,prompt,history,memory,model_state,model_name,seed,temp,tokens,top_p,rep_p,chat_mem,cust_p): |
|
|
|
model_n=models[model_name] |
|
print(model_state) |
|
hist_len=0 |
|
client=model_state |
|
if not history: |
|
history = [] |
|
hist_len=0 |
|
if not memory: |
|
memory = [] |
|
mem_len=0 |
|
if memory: |
|
for ea in memory[0-chat_mem:]: |
|
hist_len+=len(str(ea)) |
|
in_len=len(system_prompt+prompt)+hist_len |
|
|
|
if (in_len+tokens) > 8000: |
|
history.append((prompt,"Wait, that's too many tokens, please reduce the 'Chat Memory' value, or reduce the 'Max new tokens' value")) |
|
yield history,memory |
|
else: |
|
generate_kwargs = dict( |
|
temperature=temp, |
|
max_new_tokens=tokens, |
|
top_p=top_p, |
|
repetition_penalty=rep_p, |
|
do_sample=True, |
|
seed=seed, |
|
) |
|
if system_prompt: |
|
formatted_prompt = format_prompt_choose(f"{system_prompt}, {prompt}", memory[0-chat_mem:],cust_p,model_name) |
|
else: |
|
formatted_prompt = format_prompt_choose(prompt, memory[0-chat_mem:],cust_p,model_name) |
|
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=True) |
|
output = "" |
|
for response in stream: |
|
output += response.token.text |
|
yield [(prompt,output)],memory |
|
history.append((prompt,output)) |
|
memory.append((prompt,output)) |
|
yield history,memory |
|
|
|
if VERBOSE==True: |
|
print("\n######### HIST "+str(in_len)) |
|
print("\n######### TOKENS "+str(tokens)) |
|
|
|
def get_screenshot(chat: list,height=5000,width=600,chatblock=[],theme="light",wait=3000,header=True): |
|
print(chatblock) |
|
tog = 0 |
|
if chatblock: |
|
tog = 3 |
|
result = ss_client.predict(str(chat),height,width,chatblock,header,theme,wait,api_name="/run_script") |
|
out = f'https://omnibus-html-image-current-tab.hf.space/file={result[tog]}' |
|
print(out) |
|
return out |
|
|
|
def clear_fn(): |
|
return None,None,None,None |
|
rand_val=random.randint(1,1111111111111111) |
|
|
|
def check_rand(inp,val): |
|
if inp==True: |
|
return gr.Slider(label="Seed", minimum=1, maximum=1111111111111111, value=random.randint(1,1111111111111111)) |
|
else: |
|
return gr.Slider(label="Seed", minimum=1, maximum=1111111111111111, value=int(val)) |
|
|
|
with gr.Blocks() as app: |
|
model_state=gr.State() |
|
memory=gr.State() |
|
gr.HTML("""<center><h1 style='font-size:xx-large;'>Huggingface Hub InferenceClient</h1><br><h3>Chatbot's</h3></center>""") |
|
chat_b = gr.Chatbot(height=500) |
|
with gr.Group(): |
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
|
|
inp = gr.Textbox(label="Prompt") |
|
sys_inp = gr.Textbox(label="System Prompt (optional)") |
|
with gr.Accordion("Prompt Format",open=False): |
|
custom_prompt=gr.Textbox(label="Modify Prompt Format", info="For testing purposes. 'USER_INPUT' is where 'SYSTEM_PROMPT, PROMPT' will be placed", lines=3,value="<start_of_turn>userUSER_INPUT<end_of_turn><start_of_turn>model") |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
btn = gr.Button("Chat") |
|
with gr.Column(scale=1): |
|
with gr.Group(): |
|
stop_btn=gr.Button("Stop") |
|
clear_btn=gr.Button("Clear") |
|
test_btn=gr.Button("Test") |
|
client_choice=gr.Dropdown(label="Models",type='index',choices=[c for c in models],value=models[0],interactive=True) |
|
with gr.Column(scale=1): |
|
with gr.Group(): |
|
rand = gr.Checkbox(label="Random Seed", value=True) |
|
seed=gr.Slider(label="Seed", minimum=1, maximum=1111111111111111,step=1, value=rand_val) |
|
tokens = gr.Slider(label="Max new tokens",value=1600,minimum=0,maximum=8000,step=64,interactive=True, visible=True,info="The maximum number of tokens") |
|
temp=gr.Slider(label="Temperature",step=0.01, minimum=0.01, maximum=1.0, value=0.49) |
|
top_p=gr.Slider(label="Top-P",step=0.01, minimum=0.01, maximum=1.0, value=0.49) |
|
rep_p=gr.Slider(label="Repetition Penalty",step=0.01, minimum=0.1, maximum=2.0, value=0.99) |
|
chat_mem=gr.Number(label="Chat Memory", info="Number of previous chats to retain",value=4) |
|
with gr.Accordion(label="Screenshot",open=False): |
|
with gr.Row(): |
|
with gr.Column(scale=3): |
|
im_btn=gr.Button("Screenshot") |
|
img=gr.Image(type='filepath') |
|
with gr.Column(scale=1): |
|
with gr.Row(): |
|
im_height=gr.Number(label="Height",value=5000) |
|
im_width=gr.Number(label="Width",value=500) |
|
wait_time=gr.Number(label="Wait Time",value=3000) |
|
theme=gr.Radio(label="Theme", choices=["light","dark"],value="light") |
|
chatblock=gr.Dropdown(label="Chatblocks",info="Choose specific blocks of chat",choices=[c for c in range(1,40)],multiselect=True) |
|
test_json=gr.JSON(label="Test Output") |
|
test_btn.click(test_models,None,test_json) |
|
|
|
client_choice.change(load_models,client_choice,[chat_b,custom_prompt,model_state]) |
|
app.load(load_models,client_choice,[chat_b,custom_prompt,model_state]) |
|
|
|
im_go=im_btn.click(get_screenshot,[chat_b,im_height,im_width,chatblock,theme,wait_time],img) |
|
|
|
chat_sub=inp.submit(check_rand,[rand,seed],seed).then(chat_inf,[sys_inp,inp,chat_b,memory,model_state,client_choice,seed,temp,tokens,top_p,rep_p,chat_mem,custom_prompt],[chat_b,memory]) |
|
go=btn.click(check_rand,[rand,seed],seed).then(chat_inf,[sys_inp,inp,chat_b,memory,model_state,client_choice,seed,temp,tokens,top_p,rep_p,chat_mem,custom_prompt],[chat_b,memory]) |
|
|
|
stop_btn.click(None,None,None,cancels=[go,im_go,chat_sub]) |
|
clear_btn.click(clear_fn,None,[inp,sys_inp,chat_b,memory]) |
|
app.queue(default_concurrency_limit=10).launch() |