|
import os |
|
from threading import Thread |
|
from typing import Iterator |
|
|
|
import gradio as gr |
|
import spaces |
|
import torch |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer |
|
|
|
|
|
print("Starting script...") |
|
|
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
if HF_TOKEN is None: |
|
print("Warning: HF_TOKEN is not set!") |
|
|
|
PASSWORD = os.getenv("APP_PASSWORD", "mysecretpassword") |
|
|
|
DESCRIPTION = "# FT of Lama" |
|
|
|
if not torch.cuda.is_available(): |
|
DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>" |
|
print("Warning: No GPU available. This model cannot run on CPU.") |
|
else: |
|
print("GPU is available!") |
|
|
|
MAX_MAX_NEW_TOKENS = 2048 |
|
DEFAULT_MAX_NEW_TOKENS = 1024 |
|
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) |
|
|
|
|
|
if torch.cuda.is_available(): |
|
model_id = "INSAIT-Institute/BgGPT-Gemma-2-9B-IT-v1.0" |
|
try: |
|
print("Loading model...") |
|
model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.float16, device_map="auto", token=HF_TOKEN) |
|
print("Model loaded successfully!") |
|
|
|
print("Loading tokenizer...") |
|
tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN) |
|
print("Tokenizer loaded successfully!") |
|
except Exception as e: |
|
print(f"Error loading model or tokenizer: {e}") |
|
raise e |
|
|
|
|
|
@spaces.GPU |
|
def generate( |
|
message: str, |
|
chat_history: list[tuple[str, str]], |
|
max_new_tokens: int = 1024, |
|
temperature: float = 0.6, |
|
top_p: float = 0.9, |
|
top_k: int = 50, |
|
repetition_penalty: float = 1.2, |
|
) -> Iterator[str]: |
|
print(f"Received message: {message}") |
|
print(f"Chat history: {chat_history}") |
|
|
|
conversation = [] |
|
for user, assistant in chat_history: |
|
conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) |
|
conversation.append({"role": "user", "content": message}) |
|
|
|
try: |
|
print("Tokenizing input...") |
|
input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") |
|
print(f"Input tokenized: {input_ids.shape}") |
|
|
|
if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: |
|
input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] |
|
gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") |
|
print("Trimmed input tokens due to length.") |
|
|
|
input_ids = input_ids.to(model.device) |
|
print("Input moved to the model's device.") |
|
|
|
streamer = TextIteratorStreamer(tokenizer, timeout=20.0, skip_prompt=True, skip_special_tokens=True) |
|
generate_kwargs = dict( |
|
{"input_ids": input_ids}, |
|
streamer=streamer, |
|
max_new_tokens=max_new_tokens, |
|
do_sample=True, |
|
top_p=top_p, |
|
top_k=top_k, |
|
temperature=temperature, |
|
num_beams=1, |
|
repetition_penalty=repetition_penalty, |
|
) |
|
|
|
print("Starting generation...") |
|
t = Thread(target=model.generate, kwargs=generate_kwargs) |
|
t.start() |
|
print("Thread started for model generation.") |
|
|
|
outputs = [] |
|
for text in streamer: |
|
outputs.append(text) |
|
print(f"Generated text so far: {''.join(outputs)}") |
|
yield "".join(outputs) |
|
|
|
except Exception as e: |
|
print(f"Error during generation: {e}") |
|
raise e |
|
|
|
|
|
def password_auth(password): |
|
if password == PASSWORD: |
|
return gr.update(visible=True), gr.update(visible=False) |
|
else: |
|
return gr.update(visible=False), gr.update(visible=True, value="Incorrect password. Try again.") |
|
|
|
chat_interface = gr.ChatInterface( |
|
fn=generate, |
|
additional_inputs=[ |
|
gr.Slider( |
|
label="Max new tokens", |
|
minimum=1, |
|
maximum=MAX_MAX_NEW_TOKENS, |
|
step=1, |
|
value=DEFAULT_MAX_NEW_TOKENS, |
|
), |
|
gr.Slider( |
|
label="Temperature", |
|
minimum=0.1, |
|
maximum=4.0, |
|
step=0.1, |
|
value=0.6, |
|
), |
|
gr.Slider( |
|
label="Top-p (nucleus sampling)", |
|
minimum=0.05, |
|
maximum=1.0, |
|
step=0.05, |
|
value=0.9, |
|
), |
|
gr.Slider( |
|
label="Top-k", |
|
minimum=1, |
|
maximum=1000, |
|
step=1, |
|
value=50, |
|
), |
|
gr.Slider( |
|
label="Repetition penalty", |
|
minimum=1.0, |
|
maximum=2.0, |
|
step=0.05, |
|
value=1.2, |
|
), |
|
], |
|
stop_btn=None, |
|
examples=[ |
|
["Hello there! How are you doing?"], |
|
["Can you explain briefly to me what is the Python programming language?"], |
|
["Explain the plot of Cinderella in a sentence."], |
|
["How many hours does it take a man to eat a Helicopter?"], |
|
["Write a 100-word article on 'Benefits of Open-Source in AI research'"], |
|
], |
|
) |
|
|
|
|
|
print("Setting up interface...") |
|
|
|
with gr.Blocks(css="style.css") as demo: |
|
gr.Markdown(DESCRIPTION) |
|
|
|
|
|
with gr.Row(visible=True) as login_area: |
|
password_input = gr.Textbox( |
|
label="Enter Password", type="password", placeholder="Password", show_label=True |
|
) |
|
login_btn = gr.Button("Submit") |
|
incorrect_password_msg = gr.Markdown("Incorrect password. Try again.", visible=False) |
|
|
|
|
|
with gr.Column(visible=False) as chat_area: |
|
gr.Markdown(DESCRIPTION) |
|
gr.DuplicateButton( |
|
value="Duplicate Space for private use", |
|
elem_id="duplicate-button", |
|
visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", |
|
) |
|
chat_interface.render() |
|
|
|
|
|
login_btn.click(password_auth, inputs=password_input, outputs=[chat_area, incorrect_password_msg]) |
|
|
|
|
|
print("Launching demo...") |
|
|
|
if __name__ == "__main__": |
|
demo.queue(max_size=20).launch(share=True) |
|
|