Spaces:
Sleeping
Sleeping
from huggingface_hub import InferenceClient | |
import gradio as gr | |
import random | |
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1") | |
from prompts import GAME_MASTER, COMPRESS_HISTORY, ADJUST_STATS | |
def format_prompt(message, history): | |
prompt="" | |
''' | |
prompt = "<s>" | |
for user_prompt, bot_response in history: | |
prompt += f"[INST] {user_prompt} [/INST]" | |
prompt += f" {bot_response}</s> " | |
''' | |
prompt += f"[INST] {message} [/INST]" | |
return prompt | |
temperature=0.9 | |
top_p=0.95 | |
repetition_penalty=1.0 | |
def compress_history(history,temperature=temperature,top_p=top_p,repetition_penalty=repetition_penalty): | |
formatted_prompt=f"{COMPRESS_HISTORY.format(history=history)}" | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=1024, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=random.randint(1,99999999999) | |
#seed=42, | |
) | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
output = "" | |
for response in stream: | |
output += response.token.text | |
return output | |
MAX_HISTORY=100 | |
def generate(prompt, history,max_new_tokens,health,temperature=temperature,top_p=top_p,repetition_penalty=repetition_penalty): | |
temperature = float(temperature) | |
if temperature < 1e-2: | |
temperature = 1e-2 | |
top_p = float(top_p) | |
generate_kwargs = dict( | |
temperature=temperature, | |
max_new_tokens=max_new_tokens, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=random.randint(1,99999999999) | |
#seed=42, | |
) | |
cnt=0 | |
history1=history | |
for ea in history: | |
print (ea) | |
for l in ea: | |
print (l) | |
cnt+=len(l.split("\n")) | |
print(f'cnt:: {cnt}') | |
if cnt > MAX_HISTORY: | |
history1 = compress_history(str(history), temperature, max_new_tokens, top_p, repetition_penalty) | |
formatted_prompt = format_prompt(f"{GAME_MASTER.format(history=history1)}, {prompt}", history) | |
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False) | |
output = "" | |
for response in stream: | |
output += response.token.text | |
if history: | |
yield "",[(prompt,output)],health | |
else: | |
yield "",[(prompt,output)],health | |
generate_kwargs2 = dict( | |
temperature=temperature, | |
max_new_tokens=256, | |
top_p=top_p, | |
repetition_penalty=repetition_penalty, | |
do_sample=True, | |
seed=random.randint(1,99999999999) | |
#seed=42, | |
) | |
#history="" | |
#formatted_prompt2 = format_prompt(f"{ADJUST_STATS.format(history=output,health=health)}, {prompt}", history) | |
stream2 = client.text_generation(f"{ADJUST_STATS.format(history=output,health=health)}", **generate_kwargs2, stream=True, details=True, return_full_text=False) | |
output2="" | |
for response in stream2: | |
output2 += response.token.text | |
lines = output2.strip().strip("\n").split("\n") | |
for i,line in enumerate(lines): | |
if line.startswith("Health: "): | |
try: | |
new_health = line.split("Health: ")[1] | |
health=int(new_health) | |
print(health) | |
except Exception as e: | |
print (f'{health}--Error :: {e}') | |
if line.startswith("2. "): | |
print(line) | |
if line.startswith("3. "): | |
print(line) | |
if line.startswith("4. "): | |
print(line) | |
if line.startswith("5. "): | |
print(line) | |
else: | |
print(f'Line:: {line}') | |
if history: | |
history.append((prompt,output)) | |
yield "",history,health | |
else: | |
yield "",[(prompt,output)],health | |
def clear_fn(): | |
return None,None | |
with gr.Blocks() as app: | |
gr.HTML("""<center><h1>Mixtral 8x7B RPG</h1><h3>Role Playing Game Master</h3>""") | |
chatbot = gr.Chatbot(label="Mixtral 8x7B Chatbot",show_copy_button=True) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
prompt=gr.Textbox(label = "Prompt") | |
with gr.Column(scale=1): | |
button=gr.Button() | |
#models_dd=gr.Dropdown(choices=[m for m in return_list],interactive=True) | |
with gr.Row(): | |
stop_button=gr.Button("Stop") | |
clear_btn = gr.Button("Clear") | |
with gr.Row(): | |
tokens = gr.Slider(label="Max new tokens",value=1048,minimum=0,maximum=1048*10,step=64,interactive=True,info="The maximum numbers of new tokens") | |
json_out=gr.JSON() | |
health=gr.Number(value=100) | |
#text=gr.JSON() | |
#inp_query.change(search_models,inp_query,models_dd) | |
#test_b=test_btn.click(itt,url,e_box) | |
clear_btn.click(clear_fn,None,[prompt,chatbot]) | |
go=button.click(generate,[prompt,chatbot,tokens,health],[prompt,chatbot,health]) | |
stop_button.click(None,None,None,cancels=[go]) | |
app.launch(show_api=False) | |
''' | |
examples=[["Start the Game", None, None, None, None, None, ], | |
["Start a Game based in the year 1322", None, None, None, None, None,], | |
] | |
gr.ChatInterface( | |
fn=generate, | |
chatbot=gr.Chatbot(show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel"), | |
additional_inputs=additional_inputs, | |
title="Mixtral RPG Game Master", | |
examples=examples, | |
concurrency_limit=20, | |
).launch(share=True,show_api=True) | |
''' | |