import gradio as gr import gc, copy, re from rwkv.model import RWKV from rwkv.utils import PIPELINE, PIPELINE_ARGS from huggingface_hub import hf_hub_download ctx_limit = 4096 title = "RWKV-5-World-1B5-v2-20231025-ctx4096" model_path = hf_hub_download(repo_id="BlinkDL/rwkv-5-world", filename=f"{title}.pth") model = RWKV(model=model_path, strategy="cpu bf16") pipeline = PIPELINE(model, "rwkv_vocab_v20230424") def generate_prompt(instruction, input=None, history=None): has_history = (history is not None) if has_history: input = "\"" for pair in history: if len(pair[0]) > 0 and len(pair[1]) > 0: input += f"User: {pair[0]}\nAssistant: {pair[1]}\n" input = input[:-1] + "\"\n" + f"User: {instruction}" instruction = "Based on the Conversation History, generate a Response for the User." if instruction: instruction = ( instruction.strip() .replace("\r\n", "\n") .replace("\n\n", "\n") .replace("\n\n", "\n") ) if input and len(input) > 0: input = ( input.strip() .replace("\r\n", "\n") .replace("\n\n", "\n") .replace("\n\n", "\n") ) return f"""Instruction: {instruction} Input: {input} Response:""" else: return f"""User: {instruction} Assistant:""" examples = [ ["東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。", "", 300, 1.2, 0.5, 0.5, 0.5], [ "Écrivez un programme Python pour miner 1 Bitcoin, avec des commentaires.", "", 300, 1.2, 0.5, 0.5, 0.5, ], ["Write a song about ravens.", "", 300, 1.2, 0.5, 0.5, 0.5], ["Explain the following metaphor: Life is like cats.", "", 300, 1.2, 0.5, 0.5, 0.5], [ "Write a story using the following information", "A man named Alex chops a tree down", 300, 1.2, 0.5, 0.5, 0.5, ], [ "Generate a list of adjectives that describe a person as brave.", "", 300, 1.2, 0.5, 0.5, 0.5, ], [ "You have $100, and your goal is to turn that into as much money as possible with AI and Machine Learning. Please respond with detailed plan.", "", 300, 1.2, 0.5, 0.5, 0.5, ], ] def generator( instruction, input=None, token_count=333, temperature=1.0, top_p=0.5, presencePenalty=0.5, countPenalty=0.5, history=None ): args = PIPELINE_ARGS( temperature=max(2.0, float(temperature)), top_p=float(top_p), alpha_frequency=countPenalty, alpha_presence=presencePenalty, token_ban=[], # ban the generation of some tokens token_stop=[0], # stop generation whenever you see any token here ) instruction = re.sub(r"\n{2,}", "\n", instruction).strip().replace("\r\n", "\n") no_history = (history is None) if no_history: input = re.sub(r"\n{2,}", "\n", input).strip().replace("\r\n", "\n") ctx = generate_prompt(instruction, input, history) print(ctx + "\n") all_tokens = [] out_last = 0 out_str = "" occurrence = {} state = None for i in range(int(token_count)): out, state = model.forward( pipeline.encode(ctx)[-ctx_limit:] if i == 0 else [token], state ) for n in occurrence: out[n] -= args.alpha_presence + occurrence[n] * args.alpha_frequency token = pipeline.sample_logits( out, temperature=args.temperature, top_p=args.top_p ) if token in args.token_stop: break all_tokens += [token] for xxx in occurrence: occurrence[xxx] *= 0.996 if token not in occurrence: occurrence[token] = 1 else: occurrence[token] += 1 tmp = pipeline.decode(all_tokens[out_last:]) if "\ufffd" not in tmp: out_str += tmp if no_history: yield out_str.strip() else: yield tmp out_last = i + 1 if "\n\n" in out_str: break del out del state gc.collect() if no_history: yield out_str.strip() def user(message, chatbot): chatbot = chatbot or [] return "", chatbot + [[message, None]] def alternative(chatbot, history): if not chatbot or not history: return chatbot, history chatbot[-1][1] = None history[0] = copy.deepcopy(history[1]) return chatbot, history with gr.Blocks(title=title) as demo: gr.HTML(f'
\n

🌍World - {title}

\n
') gr.Markdown( f"100% RNN RWKV-LM **trained on 100+ natural languages**. Demo limited to ctxlen {ctx_limit}. For best results, keep your prompt short and clear." + "\n\n" + f"Clone this space for faster inference if you can run the app on GPU or better CPU. To use CUDA, replace strategy='cpu bf16' with strategy='cuda fp16' in `app.py`." ) with gr.Tab("Chat mode"): with gr.Row(): with gr.Column(): chatbot = gr.Chatbot() msg = gr.Textbox( scale=4, show_label=False, placeholder="Enter text and press enter", container=False, ) clear = gr.ClearButton([msg, chatbot]) with gr.Column(): token_count_chat = gr.Slider( 10, 512, label="Max Tokens", step=10, value=333 ) temperature_chat = gr.Slider( 0.2, 2.0, label="Temperature", step=0.1, value=1.2 ) top_p_chat = gr.Slider(0.0, 1.0, label="Top P", step=0.05, value=0.3) presence_penalty_chat = gr.Slider( 0.0, 1.0, label="Presence Penalty", step=0.1, value=0 ) count_penalty_chat = gr.Slider( 0.0, 1.0, label="Count Penalty", step=0.1, value=0.7 ) def clear_chat(): return "", [] def user_msg(message, history): history = history or [] return "", history + [[message, None]] def respond(history=None): global token_count_chat, temperature_chat, top_p_chat, presence_penalty_chat, count_penalty_chat # get the lastest user message and the additional parameters instruction = history[-1][0] token_count = token_count_chat.value temperature = temperature_chat.value top_p = top_p_chat.value presence_penalty = presence_penalty_chat.value count_penalty = count_penalty_chat.value history[-1][1] = "" for character in generator( instruction, None, token_count, temperature, top_p, presence_penalty, count_penalty, history ): history[-1][1] += character yield history msg.submit(user_msg, [msg, chatbot], [msg, chatbot], queue=False).then( respond, chatbot, chatbot, api_name="chat" ) with gr.Tab("Instruct mode"): with gr.Row(): with gr.Column(): instruction = gr.Textbox( lines=2, label="Instruction", value="東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。", ) input_instruct = gr.Textbox( lines=2, label="Input", placeholder="", value="" ) token_count_instruct = gr.Slider( 10, 512, label="Max Tokens", step=10, value=333 ) temperature_instruct = gr.Slider( 0.2, 2.0, label="Temperature", step=0.1, value=1.2 ) top_p_instruct = gr.Slider( 0.0, 1.0, label="Top P", step=0.05, value=0.3 ) presence_penalty_instruct = gr.Slider( 0.0, 1.0, label="Presence Penalty", step=0.1, value=0 ) count_penalty_instruct = gr.Slider( 0.0, 1.0, label="Count Penalty", step=0.1, value=0.7 ) with gr.Column(): with gr.Row(): submit = gr.Button("Submit", variant="primary") clear = gr.Button("Clear", variant="secondary") output = gr.Textbox(label="Output", lines=5) data = gr.Dataset( components=[ instruction, input_instruct, token_count_instruct, temperature_instruct, top_p_instruct, presence_penalty_instruct, count_penalty_instruct, ], samples=examples, label="Example Instructions", headers=[ "Instruction", "Input", "Max Tokens", "Temperature", "Top P", "Presence Penalty", "Count Penalty", ], ) submit.click( generator, [ instruction, input_instruct, token_count_instruct, temperature_instruct, top_p_instruct, presence_penalty_instruct, count_penalty_instruct, ], [output], ) clear.click(lambda: None, [], [output]) data.click( lambda x: x, [data], [ instruction, input_instruct, token_count_instruct, temperature_instruct, top_p_instruct, presence_penalty_instruct, count_penalty_instruct, ], ) demo.queue(max_size=10) demo.launch(share=False)