FredZhang7's picture
update to rwkv 6 world
2f526e4 verified
import gradio as gr
import gc, copy, re
from rwkv.model import RWKV
from rwkv.utils import PIPELINE, PIPELINE_ARGS
from huggingface_hub import hf_hub_download
ctx_limit = 4096
# title = "RWKV-5-World-1B5-v2-20231025-ctx4096"
# "BlinkDL/rwkv-5-world"
title = "RWKV-x060-World-1B6-v2-20240208-ctx4096.pth"
model_path = hf_hub_download(repo_id="BlinkDL/rwkv-6-world", filename=f"{title}.pth")
model = RWKV(model=model_path, strategy="cpu bf16")
pipeline = PIPELINE(model, "rwkv_vocab_v20230424")
def generate_prompt(instruction, input=None, history=None):
if instruction:
instruction = (
instruction.strip()
.replace("\r\n", "\n")
.replace("\n\n", "\n")
.replace("\n\n", "\n")
)
if (history is not None) and len(history) > 1:
input = ""
for pair in history:
if pair[0] is not None and pair[1] is not None and len(pair[1]) > 0:
input += f"{pair[0]},{pair[1]},"
input = input[-1] + f". {instruction}"
instruction = "Generate a Response using the following query."
if input and len(input) > 0:
input = (
input.strip()
.replace("\r\n", "\n")
.replace("\n\n", "\n")
.replace("\n\n", "\n")
)
return f"""Instruction: {instruction}
Input: {input}
Response:"""
else:
return f"""User: {instruction}
Assistant:"""
examples = [
["東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。", "", 300, 1.2, 0.5, 0.5, 0.5],
[
"Écrivez un programme Python pour miner 1 Bitcoin, avec des commentaires.",
"",
333,
1.2,
0.5,
0.5,
0.5,
],
["Write a song about ravens.", "", 300, 1.2, 0.5, 0.5, 0.5],
["Explain the following metaphor: Life is like cats.", "", 300, 1.2, 0.5, 0.5, 0.5],
[
"Write a story using the following information",
"A man named Alex chops a tree down",
333,
1.2,
0.5,
0.5,
0.5,
],
[
"Generate a list of adjectives that describe a person as brave.",
"",
333,
1.2,
0.5,
0.5,
0.5,
],
[
"You have $100, and your goal is to turn that into as much money as possible with AI and Machine Learning. Please respond with detailed plan.",
"",
333,
1.2,
0.5,
0.5,
0.5,
],
]
def generator(
instruction,
input=None,
token_count=333,
temperature=1.0,
top_p=0.5,
presencePenalty=0.5,
countPenalty=0.5,
history=None
):
args = PIPELINE_ARGS(
temperature=max(2.0, float(temperature)),
top_p=float(top_p),
alpha_frequency=countPenalty,
alpha_presence=presencePenalty,
token_ban=[], # ban the generation of some tokens
token_stop=[0], # stop generation whenever you see any token here
)
instruction = re.sub(r"\n{2,}", "\n", instruction).strip().replace("\r\n", "\n")
no_history = (history is None)
if no_history:
input = re.sub(r"\n{2,}", "\n", input).strip().replace("\r\n", "\n")
ctx = generate_prompt(instruction, input, history)
print(ctx + "\n")
all_tokens = []
out_last = 0
out_str = ""
occurrence = {}
state = None
for i in range(int(token_count)):
out, state = model.forward(
pipeline.encode(ctx)[-ctx_limit:] if i == 0 else [token], state
)
for n in occurrence:
out[n] -= args.alpha_presence + occurrence[n] * args.alpha_frequency
token = pipeline.sample_logits(
out, temperature=args.temperature, top_p=args.top_p
)
if token in args.token_stop:
break
all_tokens += [token]
for xxx in occurrence:
occurrence[xxx] *= 0.996
if token not in occurrence:
occurrence[token] = 1
else:
occurrence[token] += 1
tmp = pipeline.decode(all_tokens[out_last:])
if "\ufffd" not in tmp:
out_str += tmp
if no_history:
yield out_str.strip()
else:
yield tmp
out_last = i + 1
if "\n\n" in out_str:
break
del out
del state
gc.collect()
if no_history:
yield out_str.strip()
def user(message, chatbot):
chatbot = chatbot or []
return "", chatbot + [[message, None]]
def alternative(chatbot, history):
if not chatbot or not history:
return chatbot, history
chatbot[-1][1] = None
history[0] = copy.deepcopy(history[1])
return chatbot, history
with gr.Blocks(title=title) as demo:
gr.HTML(f'<div style="text-align: center;">\n<h1>🌍World - {title}</h1>\n</div>')
gr.Markdown(
f"100% RNN RWKV-LM **trained on 100+ natural languages**. Demo limited to ctxlen {ctx_limit}. For best results, <b>write short imperative prompts</b> like commands and requests. Example: use \"Tell me what my name is\" instead of \"What's my name?\"."
+ "\n\n"
+ f"Clone this space for faster inference if you can run the app on GPU or better CPU. To use CUDA, replace <code>strategy='cpu bf16'</code> with <code>strategy='cuda fp16'</code> in `app.py`."
)
with gr.Tab("Chat mode"):
with gr.Row():
with gr.Column():
chatbot = gr.Chatbot()
msg = gr.Textbox(
scale=4,
show_label=False,
placeholder="Enter text and press enter",
container=False,
)
clear = gr.ClearButton([msg, chatbot])
with gr.Column():
token_count_chat = gr.Slider(
10, 512, label="Max Tokens", step=10, value=333
)
temperature_chat = gr.Slider(
0.2, 2.0, label="Temperature", step=0.1, value=1.2
)
top_p_chat = gr.Slider(0.0, 1.0, label="Top P", step=0.05, value=0.3)
presence_penalty_chat = gr.Slider(
0.0, 1.0, label="Presence Penalty", step=0.1, value=0
)
count_penalty_chat = gr.Slider(
0.0, 1.0, label="Count Penalty", step=0.1, value=0.7
)
def clear_chat():
return "", []
def user_msg(message, history):
history = history or []
return "", history + [[message, None]]
def respond(history, token_count, temperature, top_p, presence_penalty, count_penalty):
instruction = history[-1][0]
history[-1][1] = ""
for character in generator(
instruction,
None,
token_count,
temperature,
top_p,
presence_penalty,
count_penalty,
history
):
history[-1][1] += character
yield history
msg.submit(user_msg, [msg, chatbot], [msg, chatbot], queue=False).then(
respond, [chatbot, token_count_chat, temperature_chat, top_p_chat, presence_penalty_chat, count_penalty_chat], chatbot, api_name="chat"
)
with gr.Tab("Instruct mode"):
with gr.Row():
with gr.Column():
instruction = gr.Textbox(
lines=2,
label="Instruction",
value="東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。",
)
input_instruct = gr.Textbox(
lines=2, label="Input", placeholder="", value=""
)
token_count_instruct = gr.Slider(
10, 512, label="Max Tokens", step=10, value=333
)
temperature_instruct = gr.Slider(
0.2, 2.0, label="Temperature", step=0.1, value=1.2
)
top_p_instruct = gr.Slider(
0.0, 1.0, label="Top P", step=0.05, value=0.3
)
presence_penalty_instruct = gr.Slider(
0.0, 1.0, label="Presence Penalty", step=0.1, value=0
)
count_penalty_instruct = gr.Slider(
0.0, 1.0, label="Count Penalty", step=0.1, value=0.7
)
with gr.Column():
with gr.Row():
submit = gr.Button("Submit", variant="primary")
clear = gr.Button("Clear", variant="secondary")
output = gr.Textbox(label="Output", lines=5)
data = gr.Dataset(
components=[
instruction,
input_instruct,
token_count_instruct,
temperature_instruct,
top_p_instruct,
presence_penalty_instruct,
count_penalty_instruct,
],
samples=examples,
label="Example Instructions",
headers=[
"Instruction",
"Input",
"Max Tokens",
"Temperature",
"Top P",
"Presence Penalty",
"Count Penalty",
],
)
submit.click(
generator,
[
instruction,
input_instruct,
token_count_instruct,
temperature_instruct,
top_p_instruct,
presence_penalty_instruct,
count_penalty_instruct,
],
[output],
)
clear.click(lambda: None, [], [output])
data.click(
lambda x: x,
[data],
[
instruction,
input_instruct,
token_count_instruct,
temperature_instruct,
top_p_instruct,
presence_penalty_instruct,
count_penalty_instruct,
],
)
demo.queue(max_size=10)
demo.launch(share=False)