Spaces:
Paused
Paused
import os | |
from threading import Thread | |
from typing import Iterator | |
import gradio as gr | |
import spaces | |
import torch | |
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, BitsAndBytesConfig | |
from peft import PeftModel | |
MAX_MAX_NEW_TOKENS = 2048 | |
DEFAULT_MAX_NEW_TOKENS = 1024 | |
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
DESCRIPTION = """\ | |
# Storytell AI | |
Welcome to the Storytell AI space, crafted with care by Ranam & George. Dive into the world of educational storytelling with our [Storytell](https://huggingface.co/ranamhamoud/storytell) model. This iteration of the Llama 2 model with 7 billion parameters is fine-tuned to generate educational stories that engage and educate. Enjoy a journey of discovery and creativity—your storytelling lesson begins here! | |
""" | |
LICENSE = """ | |
<p/> | |
--- | |
As a derivate work of [Llama-2-7b-chat](https://huggingface.co/meta-llama/Llama-2-7b-chat) by Meta, | |
this demo is governed by the original [license](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat/blob/main/LICENSE.txt) and [acceptable use policy](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat/blob/main/USE_POLICY.md). | |
""" | |
if not torch.cuda.is_available(): | |
DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>" | |
if torch.cuda.is_available(): | |
bnb_config = BitsAndBytesConfig( | |
load_in_8bit=True, | |
bnb_4bit_compute_dtype=torch.float16, | |
) | |
model_id = "meta-llama/Llama-2-7b-chat-hf" | |
base_model = AutoModelForCausalLM.from_pretrained(model_id, device_map="auto",quantization_config=bnb_config) | |
model = PeftModel.from_pretrained(base_model,"ranamhamoud/storytell") | |
tokenizer = AutoTokenizer.from_pretrained(model_id) | |
tokenizer.pad_token = tokenizer.eos_token | |
def save_chat_history(chat_history): | |
# Ensure the directory exists | |
os.makedirs(os.path.dirname('data/'), exist_ok=True) | |
file_path = 'data/chat_history.json' | |
# Generate a unique ID for the conversation | |
conversation_id = str(uuid.uuid4()) | |
# Prepare the conversation entry | |
conversation_entry = { | |
"id": conversation_id, | |
"chat_history": chat_history | |
} | |
# Load existing data if the file exists | |
if os.path.exists(file_path): | |
with open(file_path, 'r') as file: | |
data = json.load(file) | |
else: | |
data = [] | |
# Append the new conversation entry | |
data.append(conversation_entry) | |
# Save the updated data back to the file | |
with open(file_path, 'w') as file: | |
json.dump(data, file, indent=4) | |
return conversation_id | |
def make_prompt(entry): | |
return f"### Human: YOUR INSTRUCTION HERE,ONLY TELL A STORY,INCLUDE AT LEAST AN MCQ, FILL IN THE BLANK AND TRUE OR FALSE: {entry} ### Assistant:" | |
def generate( | |
message: str, | |
chat_history: list[tuple[str, str]], | |
max_new_tokens: int = 1024, | |
temperature: float = 0.1, # Lower -> less random | |
top_p: float = 0.1, # Lower -> less random, considering only the top 10% of tokens at each step | |
top_k: int = 1, # Least random, only the most likely next token is considered | |
repetition_penalty: float = 1.0, # No repetition penalty | |
) -> Iterator[str]: | |
conversation = [] | |
for user, assistant in chat_history: | |
conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
conversation.append({"role": "user", "content": make_prompt(message)}) | |
enc = tokenizer(make_prompt(message), return_tensors="pt", padding=True, truncation=True) | |
input_ids = enc.input_ids | |
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
input_ids = input_ids.to(model.device) | |
streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) | |
generate_kwargs = dict( | |
{"input_ids": input_ids}, | |
streamer=streamer, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
) | |
t = Thread(target=model.generate, kwargs=generate_kwargs) | |
t.start() | |
outputs = [] | |
for text in streamer: | |
outputs.append(text) | |
yield "".join(outputs) | |
final_story = "".join(outputs) # The complete story | |
conversation_id = save_chat_history(chat_history + [(message, final_story)]) | |
yield f"Conversation ID: {conversation_id}" | |
chat_interface = gr.ChatInterface( | |
fn=generate, | |
stop_btn=None, | |
examples=[ | |
["Can you explain briefly to me what is the Python programming language?"], | |
["I'm curious about Merge Sort."], | |
["Teach me about conditionals."] | |
], | |
) | |
with gr.Blocks(css="style.css") as demo: | |
gr.Markdown(DESCRIPTION) | |
chat_interface.render() | |
gr.Markdown(LICENSE) | |
if __name__ == "__main__": | |
demo.queue(max_size=20).launch() | |