Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, StoppingCriteria, StoppingCriteriaList | |
| import time | |
| import numpy as np | |
| from torch.nn import functional as F | |
| import os | |
| # auth_key = os.environ["HF_ACCESS_TOKEN"] | |
| print(f"Starting to load the model to memory") | |
| m = AutoModelForCausalLM.from_pretrained( | |
| "stabilityai/stablelm-tuned-alpha-7b", torch_dtype=torch.float16).cuda() | |
| tok = AutoTokenizer.from_pretrained("stabilityai/stablelm-tuned-alpha-7b") | |
| generator = pipeline('text-generation', model=m, tokenizer=tok, device=0) | |
| print(f"Sucessfully loaded the model to the memory") | |
| start_message = """<|SYSTEM|># StableAssistant | |
| - StableAssistant is A helpful and harmless Open Source AI Language Model developed by Stability and CarperAI. | |
| - StableAssistant is excited to be able to help the user, but will refuse to do anything that could be considered harmful to the user. | |
| - StableAssistant is more than just an information source, StableAssistant is also able to write poetry, short stories, and make jokes. | |
| - StableAssistant will refuse to participate in anything that could harm a human.""" | |
| class StopOnTokens(StoppingCriteria): | |
| def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: | |
| stop_ids = [50278, 50279, 50277, 1, 0] | |
| for stop_id in stop_ids: | |
| if input_ids[0][-1] == stop_id: | |
| return True | |
| return False | |
| def contrastive_generate(text, bad_text): | |
| with torch.no_grad(): | |
| tokens = tok(text, return_tensors="pt")[ | |
| 'input_ids'].cuda()[:, :4096-1024] | |
| bad_tokens = tok(bad_text, return_tensors="pt")[ | |
| 'input_ids'].cuda()[:, :4096-1024] | |
| history = None | |
| bad_history = None | |
| curr_output = list() | |
| for i in range(1024): | |
| out = m(tokens, past_key_values=history, use_cache=True) | |
| logits = out.logits | |
| history = out.past_key_values | |
| bad_out = m(bad_tokens, past_key_values=bad_history, | |
| use_cache=True) | |
| bad_logits = bad_out.logits | |
| bad_history = bad_out.past_key_values | |
| probs = F.softmax(logits.float(), dim=-1)[0][-1].cpu() | |
| bad_probs = F.softmax(bad_logits.float(), dim=-1)[0][-1].cpu() | |
| logits = torch.log(probs) | |
| bad_logits = torch.log(bad_probs) | |
| logits[probs > 0.1] = logits[probs > 0.1] - bad_logits[probs > 0.1] | |
| probs = F.softmax(logits) | |
| out = int(torch.multinomial(probs, 1)) | |
| if out in [50278, 50279, 50277, 1, 0]: | |
| break | |
| else: | |
| curr_output.append(out) | |
| out = np.array([out]) | |
| tokens = torch.from_numpy(np.array([out])).to( | |
| tokens.device) | |
| bad_tokens = torch.from_numpy(np.array([out])).to( | |
| tokens.device) | |
| return tok.decode(curr_output) | |
| def generate(text, bad_text=None): | |
| stop = StopOnTokens() | |
| result = generator(text, max_new_tokens=1024, num_return_sequences=1, num_beams=1, do_sample=True, | |
| temperature=1.0, top_p=0.95, top_k=1000, stopping_criteria=StoppingCriteriaList([stop])) | |
| return result[0]["generated_text"].replace(text, "") | |
| def user(user_message, history): | |
| history = history + [[user_message, ""]] | |
| return "", history, history | |
| def bot(history, curr_system_message): | |
| messages = curr_system_message + \ | |
| "".join(["".join(["<|USER|>"+item[0], "<|ASSISTANT|>"+item[1]]) | |
| for item in history]) | |
| output = generate(messages) | |
| history[-1][1] = output | |
| time.sleep(1) | |
| return history, history | |
| with gr.Blocks() as demo: | |
| history = gr.State([]) | |
| gr.Markdown("## StableLM-Tuned-Alpha-7b Chat") | |
| gr.HTML('''<center><a href="https://huggingface.co/spaces/stabilityai/stablelm-tuned-alpha-chat?duplicate=true"><img src="https://bit.ly/3gLdBN6" alt="Duplicate Space"></a>Duplicate the Space to skip the queue and run in a private space</center>''') | |
| chatbot = gr.Chatbot().style(height=500) | |
| with gr.Row(): | |
| with gr.Column(scale=0.70): | |
| msg = gr.Textbox(label="", placeholder="Chat Message Box") | |
| with gr.Column(scale=0.30, min_width=0): | |
| with gr.Row(): | |
| submit = gr.Button("Submit") | |
| clear = gr.Button("Clear") | |
| system_msg = gr.Textbox( | |
| start_message, label="System Message", interactive=False, visible=False) | |
| msg.submit(fn=user, inputs=[msg, history], outputs=[msg, chatbot, history], queue=False).then( | |
| fn=bot, inputs=[chatbot, system_msg], outputs=[chatbot, history], queue=True) | |
| submit.click(fn=user, inputs=[msg, history], outputs=[msg, chatbot, history], queue=False).then( | |
| fn=bot, inputs=[chatbot, system_msg], outputs=[chatbot, history], queue=True) | |
| clear.click(lambda: [None, []], None, [chatbot, history], queue=False) | |
| demo.queue(concurrency_count=5) | |
| demo.launch() | |