import os from threading import Thread from typing import Iterator import gradio as gr import spaces import torch from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer from transformers import BitsAndBytesConfig nf4_config = BitsAndBytesConfig( load_in_8bit=True, bnb_8bit_use_double_quant=True, bnb_8bit_quant_type="nf8", ) MAX_MAX_NEW_TOKENS = 2048 DEFAULT_MAX_NEW_TOKENS = 1024 total_count=0 MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) import gradio as gr from transformers import AutoTokenizer, AutoModelForSeq2SeqLM dict_map = { "òa": "oà", "Òa": "Oà", "ÒA": "OÀ", "óa": "oá", "Óa": "Oá", "ÓA": "OÁ", "ỏa": "oả", "Ỏa": "Oả", "ỎA": "OẢ", "õa": "oã", "Õa": "Oã", "ÕA": "OÃ", "ọa": "oạ", "Ọa": "Oạ", "ỌA": "OẠ", "òe": "oè", "Òe": "Oè", "ÒE": "OÈ", "óe": "oé", "Óe": "Oé", "ÓE": "OÉ", "ỏe": "oẻ", "Ỏe": "Oẻ", "ỎE": "OẺ", "õe": "oẽ", "Õe": "Oẽ", "ÕE": "OẼ", "ọe": "oẹ", "Ọe": "Oẹ", "ỌE": "OẸ", "ùy": "uỳ", "Ùy": "Uỳ", "ÙY": "UỲ", "úy": "uý", "Úy": "Uý", "ÚY": "UÝ", "ủy": "uỷ", "Ủy": "Uỷ", "ỦY": "UỶ", "ũy": "uỹ", "Ũy": "Uỹ", "ŨY": "UỸ", "ụy": "uỵ", "Ụy": "Uỵ", "ỤY": "UỴ", } tokenizer_vi2en = AutoTokenizer.from_pretrained("vinai/vinai-translate-vi2en-v2", src_lang="vi_VN") model_vi2en = AutoModelForSeq2SeqLM.from_pretrained("vinai/vinai-translate-vi2en-v2",device_map="auto") def translate_vi2en(vi_text: str) -> str: for i, j in dict_map.items(): vi_text = vi_text.replace(i, j) input_ids = tokenizer_vi2en(vi_text, return_tensors="pt").to("cuda").input_ids output_ids = model_vi2en.generate( input_ids, decoder_start_token_id=tokenizer_vi2en.lang_code_to_id["en_XX"], num_return_sequences=1, # # With sampling # do_sample=True, # top_k=100, # top_p=0.8, # With beam search num_beams=5, early_stopping=True ) en_text = tokenizer_vi2en.batch_decode(output_ids, skip_special_tokens=True) en_text = " ".join(en_text) return en_text DESCRIPTION="""CODE""" model_id = "deepseek-ai/deepseek-coder-7b-instruct-v1.5" model = AutoModelForCausalLM.from_pretrained(model_id,device_map="auto",quantization_config=nf4_config) tokenizer=AutoTokenizer.from_pretrained(model_id) tokenizer.use_defaul_system_prompt=True os.system("nvidia-smi") @spaces.GPU def gen( message: str, chat_history: list[tuple[str, str]], system_prompt: str, max_new_tokens: int = 1024, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1, )->Iterator[str]: global total_count total_count += 1 print(total_count) os.system("nvidia-smi") conversation = [] message = translate_vi2en(message) if system_prompt: conversation.append({"role": "system", "content": system_prompt}) for user, assistant in chat_history: conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) conversation.append({"role": "user", "content": message}) input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") input_ids = input_ids.to(model.device) streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) generate_kwargs = dict( {"input_ids": input_ids}, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=False, top_p=top_p, top_k=top_k, num_beams=1, # temperature=temperature, repetition_penalty=repetition_penalty, eos_token_id=32021 ) t = Thread(target=model.generate, kwargs=generate_kwargs) t.start() outputs = [] for text in streamer: outputs.append(text) yield "".join(outputs).replace("<|EOT|>","") chat_interface = gr.ChatInterface( fn=gen, additional_inputs=[ gr.Textbox(label="System prompt", lines=6), gr.Slider( label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS, ), # gr.Slider( # label="Temperature", # minimum=0, # maximum=4.0, # step=0.1, # value=0, # ), gr.Slider( label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9, ), gr.Slider( label="Top-k", minimum=1, maximum=1000, step=1, value=50, ), gr.Slider( label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1, ), ], stop_btn=gr.Button("Stop"), examples=[ ["implement snake game using pygame"], ["Can you explain briefly to me what is the Python programming language?"], ["write a program to find the factorial of a number"], ], ) with gr.Blocks(css="style.css") as demo: gr.Markdown(DESCRIPTION) chat_interface.render() if __name__ == "__main__": demo.queue(max_size=20).launch()