|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, StoppingCriteria, StoppingCriteriaList, TextIteratorStreamer |
|
import time |
|
import numpy as np |
|
from torch.nn import functional as F |
|
import os |
|
from .base_model import BaseLLMModel |
|
from threading import Thread |
|
|
|
STABLELM_MODEL = None |
|
STABLELM_TOKENIZER = None |
|
|
|
|
|
class StopOnTokens(StoppingCriteria): |
|
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: |
|
stop_ids = [50278, 50279, 50277, 1, 0] |
|
for stop_id in stop_ids: |
|
if input_ids[0][-1] == stop_id: |
|
return True |
|
return False |
|
|
|
|
|
class StableLM_Client(BaseLLMModel): |
|
def __init__(self, model_name, user_name="") -> None: |
|
super().__init__(model_name=model_name, user=user_name) |
|
global STABLELM_MODEL, STABLELM_TOKENIZER |
|
print(f"Starting to load StableLM to memory") |
|
if model_name == "StableLM": |
|
model_name = "stabilityai/stablelm-tuned-alpha-7b" |
|
else: |
|
model_name = f"models/{model_name}" |
|
if STABLELM_MODEL is None: |
|
STABLELM_MODEL = AutoModelForCausalLM.from_pretrained( |
|
model_name, torch_dtype=torch.float16).cuda() |
|
if STABLELM_TOKENIZER is None: |
|
STABLELM_TOKENIZER = AutoTokenizer.from_pretrained(model_name) |
|
self.generator = pipeline( |
|
'text-generation', model=STABLELM_MODEL, tokenizer=STABLELM_TOKENIZER, device=0) |
|
print(f"Sucessfully loaded StableLM to the memory") |
|
self.system_prompt = """StableAssistant |
|
- StableAssistant is A helpful and harmless Open Source AI Language Model developed by Stability and CarperAI. |
|
- StableAssistant is excited to be able to help the user, but will refuse to do anything that could be considered harmful to the user. |
|
- StableAssistant is more than just an information source, StableAssistant is also able to write poetry, short stories, and make jokes. |
|
- StableAssistant will refuse to participate in anything that could harm a human.""" |
|
self.max_generation_token = 1024 |
|
self.top_p = 0.95 |
|
self.temperature = 1.0 |
|
|
|
def _get_stablelm_style_input(self): |
|
history = self.history + [{"role": "assistant", "content": ""}] |
|
print(history) |
|
messages = self.system_prompt + \ |
|
"".join(["".join(["<|USER|>"+history[i]["content"], "<|ASSISTANT|>"+history[i + 1]["content"]]) |
|
for i in range(0, len(history), 2)]) |
|
return messages |
|
|
|
def _generate(self, text, bad_text=None): |
|
stop = StopOnTokens() |
|
result = self.generator(text, max_new_tokens=self.max_generation_token, num_return_sequences=1, num_beams=1, do_sample=True, |
|
temperature=self.temperature, top_p=self.top_p, top_k=1000, stopping_criteria=StoppingCriteriaList([stop])) |
|
return result[0]["generated_text"].replace(text, "") |
|
|
|
def get_answer_at_once(self): |
|
messages = self._get_stablelm_style_input() |
|
return self._generate(messages), len(messages) |
|
|
|
def get_answer_stream_iter(self): |
|
stop = StopOnTokens() |
|
messages = self._get_stablelm_style_input() |
|
|
|
|
|
model_inputs = STABLELM_TOKENIZER( |
|
[messages], return_tensors="pt").to("cuda") |
|
streamer = TextIteratorStreamer( |
|
STABLELM_TOKENIZER, timeout=10., skip_prompt=True, skip_special_tokens=True) |
|
generate_kwargs = dict( |
|
model_inputs, |
|
streamer=streamer, |
|
max_new_tokens=self.max_generation_token, |
|
do_sample=True, |
|
top_p=self.top_p, |
|
top_k=1000, |
|
temperature=self.temperature, |
|
num_beams=1, |
|
stopping_criteria=StoppingCriteriaList([stop]) |
|
) |
|
t = Thread(target=STABLELM_MODEL.generate, kwargs=generate_kwargs) |
|
t.start() |
|
|
|
partial_text = "" |
|
for new_text in streamer: |
|
partial_text += new_text |
|
yield partial_text |
|
|