|
import gradio as gr |
|
import gc, copy, re |
|
import urllib.request |
|
from rwkv.model import RWKV |
|
from rwkv.utils import PIPELINE, PIPELINE_ARGS |
|
|
|
ctx_limit = 4096 |
|
title = "RWKV-5-World-0.1B-v1-20230803-ctx4096.pth" |
|
|
|
url = f"https://huggingface.co/BlinkDL/rwkv-5-world/resolve/main/{title}" |
|
urllib.request.urlretrieve(url, title) |
|
|
|
model = RWKV(model=title, strategy='cpu bf16') |
|
pipeline = PIPELINE(model, "rwkv_vocab_v20230424") |
|
|
|
def generate_prompt(instruction, input=None, history=None): |
|
|
|
history_str = "" |
|
for pair in history: |
|
history_str += f"Instruction: {pair[0]}\n\nAssistant: {pair[1]}\n\n" |
|
|
|
instruction = instruction.strip().replace('\r\n','\n').replace('\n\n','\n').replace('\n\n','\n') |
|
input = input.strip().replace('\r\n','\n').replace('\n\n','\n').replace('\n\n','\n') |
|
if input and len(input) > 0: |
|
return f"""{history_str}Instruction: {instruction} |
|
|
|
Input: {input} |
|
|
|
Response:""" |
|
else: |
|
return f"""{history_str}User: {instruction} |
|
|
|
Assistant:""" |
|
|
|
examples = [ |
|
["東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。", "", 300, 1.2, 0.5, 0.5, 0.5], |
|
["Écrivez un programme Python pour miner 1 Bitcoin, avec des commentaires.", "", 300, 1.2, 0.5, 0.5, 0.5], |
|
["Write a song about ravens.", "", 300, 1.2, 0.5, 0.5, 0.5], |
|
["Explain the following metaphor: Life is like cats.", "", 300, 1.2, 0.5, 0.5, 0.5], |
|
["Write a story using the following information", "A man named Alex chops a tree down", 300, 1.2, 0.5, 0.5, 0.5], |
|
["Generate a list of adjectives that describe a person as brave.", "", 300, 1.2, 0.5, 0.5, 0.5], |
|
["You have $100, and your goal is to turn that into as much money as possible with AI and Machine Learning. Please respond with detailed plan.", "", 300, 1.2, 0.5, 0.5, 0.5], |
|
] |
|
|
|
def evaluate( |
|
instruction, |
|
input=None, |
|
token_count=333, |
|
temperature=1.0, |
|
top_p=0.5, |
|
presencePenalty = 0.5, |
|
countPenalty = 0.5, |
|
history=None |
|
): |
|
args = PIPELINE_ARGS(temperature = max(0.2, float(temperature)), top_p = float(top_p), |
|
alpha_frequency = countPenalty, |
|
alpha_presence = presencePenalty, |
|
token_ban = [], |
|
token_stop = [0]) |
|
|
|
instruction = re.sub(r'\n{2,}', '\n', instruction).strip().replace('\r\n','\n') |
|
input = re.sub(r'\n{2,}', '\n', input).strip().replace('\r\n','\n') |
|
ctx = generate_prompt(instruction, input, history) |
|
print(ctx + "\n") |
|
|
|
all_tokens = [] |
|
out_last = 0 |
|
out_str = '' |
|
occurrence = {} |
|
state = None |
|
for i in range(int(token_count)): |
|
out, state = model.forward(pipeline.encode(ctx)[-ctx_limit:] if i == 0 else [token], state) |
|
for n in occurrence: |
|
out[n] -= (args.alpha_presence + occurrence[n] * args.alpha_frequency) |
|
|
|
token = pipeline.sample_logits(out, temperature=args.temperature, top_p=args.top_p) |
|
if token in args.token_stop: |
|
break |
|
all_tokens += [token] |
|
for xxx in occurrence: |
|
occurrence[xxx] *= 0.996 |
|
if token not in occurrence: |
|
occurrence[token] = 1 |
|
else: |
|
occurrence[token] += 1 |
|
|
|
tmp = pipeline.decode(all_tokens[out_last:]) |
|
if '\ufffd' not in tmp: |
|
out_str += tmp |
|
yield out_str.strip() |
|
out_last = i + 1 |
|
if '\n\n' in out_str: |
|
break |
|
|
|
del out |
|
del state |
|
gc.collect() |
|
yield out_str.strip() |
|
|
|
def user(message, chatbot): |
|
chatbot = chatbot or [] |
|
return "", chatbot + [[message, None]] |
|
|
|
def alternative(chatbot, history): |
|
if not chatbot or not history: |
|
return chatbot, history |
|
|
|
chatbot[-1][1] = None |
|
history[0] = copy.deepcopy(history[1]) |
|
|
|
return chatbot, history |
|
|
|
|
|
with gr.Blocks(title=title) as demo: |
|
gr.HTML(f"<div style=\"text-align: center;\">\n<h1>🌍World - {title}</h1>\n</div>") |
|
|
|
with gr.Tab("Instruct mode"): |
|
gr.Markdown(f"100% RNN RWKV-LM **trained on 100+ natural languages**. Demo limited to ctxlen {ctx_limit}. For best results, <b>keep your prompt short and clear</b>.") |
|
with gr.Row(): |
|
with gr.Column(): |
|
instruction = gr.Textbox(lines=2, label="Instruction", value='東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。') |
|
input = gr.Textbox(lines=2, label="Input", placeholder="") |
|
token_count = gr.Slider(10, 512, label="Max Tokens", step=10, value=333) |
|
temperature = gr.Slider(0.2, 2.0, label="Temperature", step=0.1, value=1.2) |
|
top_p = gr.Slider(0.0, 1.0, label="Top P", step=0.05, value=0.3) |
|
presence_penalty = gr.Slider(0.0, 1.0, label="Presence Penalty", step=0.1, value=0) |
|
count_penalty = gr.Slider(0.0, 1.0, label="Count Penalty", step=0.1, value=0.7) |
|
with gr.Column(): |
|
with gr.Row(): |
|
submit = gr.Button("Submit", variant="primary") |
|
clear = gr.Button("Clear", variant="secondary") |
|
output = gr.Textbox(label="Output", lines=5) |
|
data = gr.Dataset(components=[instruction, input, token_count, temperature, top_p, presence_penalty, count_penalty], samples=examples, label="Example Instructions", headers=["Instruction", "Input", "Max Tokens", "Temperature", "Top P", "Presence Penalty", "Count Penalty"]) |
|
submit.click(evaluate, [instruction, input, token_count, temperature, top_p, presence_penalty, count_penalty, []], [output]) |
|
clear.click(lambda: None, [], [output]) |
|
data.click(lambda x: x, [data], [instruction, input, token_count, temperature, top_p, presence_penalty, count_penalty]) |
|
|
|
with gr.Tab("Chat mode"): |
|
with gr.Row(): |
|
chatbot = gr.Chatbot() |
|
with gr.Column(): |
|
msg = gr.Textbox(scale=4, show_label=False, placeholder="Enter text and press enter", container=False) |
|
clear = gr.Button("Clear") |
|
with gr.Column(): |
|
token_count = gr.Slider(10, 512, label="Max Tokens", step=10, value=333) |
|
temperature = gr.Slider(0.2, 2.0, label="Temperature", step=0.1, value=1.2) |
|
top_p = gr.Slider(0.0, 1.0, label="Top P", step=0.05, value=0.3) |
|
presence_penalty = gr.Slider(0.0, 1.0, label="Presence Penalty", step=0.1, value=0) |
|
count_penalty = gr.Slider(0.0, 1.0, label="Count Penalty", step=0.1, value=0.7) |
|
|
|
def clear_chat(): |
|
return "", [] |
|
|
|
def user_msg(message, history): |
|
history = history or [] |
|
return "", history + [[message, None]] |
|
|
|
def chat(history): |
|
|
|
message = history[-1][0] |
|
instruction = msg.value |
|
token_count = token_count.value |
|
|
|
temperature = temperature.value |
|
top_p = top_p.value |
|
presence_penalty = presence_penalty.value |
|
count_penalty = count_penalty.value |
|
|
|
response = evaluate(instruction, None, token_count, temperature, top_p, presence_penalty, count_penalty, history) |
|
|
|
history[-1][1] = response |
|
return history |
|
|
|
|
|
msg.submit(user_msg, [msg, chatbot], [msg, chatbot], queue=False).then( |
|
chat, chatbot, chatbot, api_name="chat" |
|
) |
|
clear.click(clear_chat, None, [chatbot], queue=False) |
|
|
|
demo.queue(max_size=10) |
|
demo.launch(share=False) |