|  |  | 
					
						
						|  |  | 
					
						
						|  | import spaces | 
					
						
						|  | import gradio as gr | 
					
						
						|  | from PIL import Image | 
					
						
						|  | import traceback | 
					
						
						|  | import re | 
					
						
						|  | import torch | 
					
						
						|  | import argparse | 
					
						
						|  | from transformers import AutoModel, AutoTokenizer | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | parser = argparse.ArgumentParser(description='demo') | 
					
						
						|  | parser.add_argument('--device', type=str, default='cuda', help='cuda or mps') | 
					
						
						|  | args = parser.parse_args() | 
					
						
						|  | device = args.device | 
					
						
						|  | assert device in ['cuda', 'mps'] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | model_path = AutoModel.from_pretrained("openbmb/MiniCPM-Llama3-V-2_5", trust_remote_code=True) | 
					
						
						|  | if 'int4' in model_path: | 
					
						
						|  | if device == 'mps': | 
					
						
						|  | print('Error: running int4 model with bitsandbytes on Mac is not supported right now.') | 
					
						
						|  | exit() | 
					
						
						|  | model = AutoModel.from_pretrained(model_path, trust_remote_code=True) | 
					
						
						|  | else: | 
					
						
						|  | model = AutoModel.from_pretrained(model_path, trust_remote_code=True).to(dtype=torch.float16) | 
					
						
						|  | model = model.to(device=device) | 
					
						
						|  | tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) | 
					
						
						|  | model.eval() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | ERROR_MSG = "Error, please retry" | 
					
						
						|  | model_name = 'MiniCPM-Llama3-V 2.5' | 
					
						
						|  |  | 
					
						
						|  | form_radio = { | 
					
						
						|  | 'choices': ['Beam Search', 'Sampling'], | 
					
						
						|  |  | 
					
						
						|  | 'value': 'Sampling', | 
					
						
						|  | 'interactive': True, | 
					
						
						|  | 'label': 'Decode Type' | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | num_beams_slider = { | 
					
						
						|  | 'minimum': 0, | 
					
						
						|  | 'maximum': 5, | 
					
						
						|  | 'value': 3, | 
					
						
						|  | 'step': 1, | 
					
						
						|  | 'interactive': True, | 
					
						
						|  | 'label': 'Num Beams' | 
					
						
						|  | } | 
					
						
						|  | repetition_penalty_slider = { | 
					
						
						|  | 'minimum': 0, | 
					
						
						|  | 'maximum': 3, | 
					
						
						|  | 'value': 1.2, | 
					
						
						|  | 'step': 0.01, | 
					
						
						|  | 'interactive': True, | 
					
						
						|  | 'label': 'Repetition Penalty' | 
					
						
						|  | } | 
					
						
						|  | repetition_penalty_slider2 = { | 
					
						
						|  | 'minimum': 0, | 
					
						
						|  | 'maximum': 3, | 
					
						
						|  | 'value': 1.05, | 
					
						
						|  | 'step': 0.01, | 
					
						
						|  | 'interactive': True, | 
					
						
						|  | 'label': 'Repetition Penalty' | 
					
						
						|  | } | 
					
						
						|  | max_new_tokens_slider = { | 
					
						
						|  | 'minimum': 1, | 
					
						
						|  | 'maximum': 4096, | 
					
						
						|  | 'value': 1024, | 
					
						
						|  | 'step': 1, | 
					
						
						|  | 'interactive': True, | 
					
						
						|  | 'label': 'Max New Tokens' | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | top_p_slider = { | 
					
						
						|  | 'minimum': 0, | 
					
						
						|  | 'maximum': 1, | 
					
						
						|  | 'value': 0.8, | 
					
						
						|  | 'step': 0.05, | 
					
						
						|  | 'interactive': True, | 
					
						
						|  | 'label': 'Top P' | 
					
						
						|  | } | 
					
						
						|  | top_k_slider = { | 
					
						
						|  | 'minimum': 0, | 
					
						
						|  | 'maximum': 200, | 
					
						
						|  | 'value': 100, | 
					
						
						|  | 'step': 1, | 
					
						
						|  | 'interactive': True, | 
					
						
						|  | 'label': 'Top K' | 
					
						
						|  | } | 
					
						
						|  | temperature_slider = { | 
					
						
						|  | 'minimum': 0, | 
					
						
						|  | 'maximum': 2, | 
					
						
						|  | 'value': 0.7, | 
					
						
						|  | 'step': 0.05, | 
					
						
						|  | 'interactive': True, | 
					
						
						|  | 'label': 'Temperature' | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def create_component(params, comp='Slider'): | 
					
						
						|  | if comp == 'Slider': | 
					
						
						|  | return gr.Slider( | 
					
						
						|  | minimum=params['minimum'], | 
					
						
						|  | maximum=params['maximum'], | 
					
						
						|  | value=params['value'], | 
					
						
						|  | step=params['step'], | 
					
						
						|  | interactive=params['interactive'], | 
					
						
						|  | label=params['label'] | 
					
						
						|  | ) | 
					
						
						|  | elif comp == 'Radio': | 
					
						
						|  | return gr.Radio( | 
					
						
						|  | choices=params['choices'], | 
					
						
						|  | value=params['value'], | 
					
						
						|  | interactive=params['interactive'], | 
					
						
						|  | label=params['label'] | 
					
						
						|  | ) | 
					
						
						|  | elif comp == 'Button': | 
					
						
						|  | return gr.Button( | 
					
						
						|  | value=params['value'], | 
					
						
						|  | interactive=True | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | @spaces.GPU(duration=120) | 
					
						
						|  | def chat(img, msgs, ctx, params=None, vision_hidden_states=None): | 
					
						
						|  | default_params = {"stream": False, "sampling": False, "num_beams":3, "repetition_penalty": 1.2, "max_new_tokens": 1024} | 
					
						
						|  | if params is None: | 
					
						
						|  | params = default_params | 
					
						
						|  | if img is None: | 
					
						
						|  | yield "Error, invalid image, please upload a new image" | 
					
						
						|  | else: | 
					
						
						|  | try: | 
					
						
						|  | image = img.convert('RGB') | 
					
						
						|  | answer = model.chat( | 
					
						
						|  | image=image, | 
					
						
						|  | msgs=msgs, | 
					
						
						|  | tokenizer=tokenizer, | 
					
						
						|  | **params | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for char in answer: | 
					
						
						|  | yield char | 
					
						
						|  | except Exception as err: | 
					
						
						|  | print(err) | 
					
						
						|  | traceback.print_exc() | 
					
						
						|  | yield ERROR_MSG | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def upload_img(image, _chatbot, _app_session): | 
					
						
						|  | image = Image.fromarray(image) | 
					
						
						|  |  | 
					
						
						|  | _app_session['sts']=None | 
					
						
						|  | _app_session['ctx']=[] | 
					
						
						|  | _app_session['img']=image | 
					
						
						|  | _chatbot.append(('', 'Image uploaded successfully, you can talk to me now')) | 
					
						
						|  | return _chatbot, _app_session | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def respond(_chat_bot, _app_cfg, params_form, num_beams, repetition_penalty, repetition_penalty_2, top_p, top_k, temperature): | 
					
						
						|  | _question = _chat_bot[-1][0] | 
					
						
						|  | print('<Question>:', _question) | 
					
						
						|  | if _app_cfg.get('ctx', None) is None: | 
					
						
						|  | _chat_bot[-1][1] = 'Please upload an image to start' | 
					
						
						|  | yield (_chat_bot, _app_cfg) | 
					
						
						|  | else: | 
					
						
						|  | _context = _app_cfg['ctx'].copy() | 
					
						
						|  | if _context: | 
					
						
						|  | _context.append({"role": "user", "content": _question}) | 
					
						
						|  | else: | 
					
						
						|  | _context = [{"role": "user", "content": _question}] | 
					
						
						|  | if params_form == 'Beam Search': | 
					
						
						|  | params = { | 
					
						
						|  | 'sampling': False, | 
					
						
						|  | 'stream': False, | 
					
						
						|  | 'num_beams': num_beams, | 
					
						
						|  | 'repetition_penalty': repetition_penalty, | 
					
						
						|  | "max_new_tokens": 896 | 
					
						
						|  | } | 
					
						
						|  | else: | 
					
						
						|  | params = { | 
					
						
						|  | 'sampling': True, | 
					
						
						|  | 'stream': True, | 
					
						
						|  | 'top_p': top_p, | 
					
						
						|  | 'top_k': top_k, | 
					
						
						|  | 'temperature': temperature, | 
					
						
						|  | 'repetition_penalty': repetition_penalty_2, | 
					
						
						|  | "max_new_tokens": 896 | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | gen = chat(_app_cfg['img'], _context, None, params) | 
					
						
						|  | _chat_bot[-1][1] = "" | 
					
						
						|  | for _char in gen: | 
					
						
						|  | _chat_bot[-1][1] += _char | 
					
						
						|  | _context[-1]["content"] += _char | 
					
						
						|  | yield (_chat_bot, _app_cfg) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def request(_question, _chat_bot, _app_cfg): | 
					
						
						|  | _chat_bot.append((_question, None)) | 
					
						
						|  | return '', _chat_bot, _app_cfg | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def regenerate_button_clicked(_question, _chat_bot, _app_cfg): | 
					
						
						|  | if len(_chat_bot) <= 1: | 
					
						
						|  | _chat_bot.append(('Regenerate', 'No question for regeneration.')) | 
					
						
						|  | return '', _chat_bot, _app_cfg | 
					
						
						|  | elif _chat_bot[-1][0] == 'Regenerate': | 
					
						
						|  | return '', _chat_bot, _app_cfg | 
					
						
						|  | else: | 
					
						
						|  | _question = _chat_bot[-1][0] | 
					
						
						|  | _chat_bot = _chat_bot[:-1] | 
					
						
						|  | _app_cfg['ctx'] = _app_cfg['ctx'][:-2] | 
					
						
						|  | return request(_question, _chat_bot, _app_cfg) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def clear_button_clicked(_question, _chat_bot, _app_cfg, _bt_pic): | 
					
						
						|  | _chat_bot.clear() | 
					
						
						|  | _app_cfg['sts'] = None | 
					
						
						|  | _app_cfg['ctx'] = None | 
					
						
						|  | _app_cfg['img'] = None | 
					
						
						|  | _bt_pic = None | 
					
						
						|  | return '', _chat_bot, _app_cfg, _bt_pic | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with gr.Blocks() as demo: | 
					
						
						|  | with gr.Row(): | 
					
						
						|  | with gr.Column(scale=1, min_width=300): | 
					
						
						|  | params_form = create_component(form_radio, comp='Radio') | 
					
						
						|  | with gr.Accordion("Beam Search") as beams_according: | 
					
						
						|  | num_beams = create_component(num_beams_slider) | 
					
						
						|  | repetition_penalty = create_component(repetition_penalty_slider) | 
					
						
						|  | with gr.Accordion("Sampling") as sampling_according: | 
					
						
						|  | top_p = create_component(top_p_slider) | 
					
						
						|  | top_k = create_component(top_k_slider) | 
					
						
						|  | temperature = create_component(temperature_slider) | 
					
						
						|  | repetition_penalty_2 = create_component(repetition_penalty_slider2) | 
					
						
						|  | regenerate = create_component({'value': 'Regenerate'}, comp='Button') | 
					
						
						|  | clear = create_component({'value': 'Clear'}, comp='Button') | 
					
						
						|  | with gr.Column(scale=3, min_width=500): | 
					
						
						|  | app_session = gr.State({'sts':None,'ctx':None,'img':None}) | 
					
						
						|  | bt_pic = gr.Image(label="Upload an image to start") | 
					
						
						|  | chat_bot = gr.Chatbot(label=f"Chat with {model_name}") | 
					
						
						|  | txt_message = gr.Textbox(label="Input text") | 
					
						
						|  |  | 
					
						
						|  | clear.click( | 
					
						
						|  | clear_button_clicked, | 
					
						
						|  | [txt_message, chat_bot, app_session, bt_pic], | 
					
						
						|  | [txt_message, chat_bot, app_session, bt_pic], | 
					
						
						|  | queue=False | 
					
						
						|  | ) | 
					
						
						|  | txt_message.submit( | 
					
						
						|  | request, | 
					
						
						|  |  | 
					
						
						|  | [txt_message, chat_bot, app_session], | 
					
						
						|  | [txt_message, chat_bot, app_session], | 
					
						
						|  | queue=False | 
					
						
						|  | ).then( | 
					
						
						|  | respond, | 
					
						
						|  | [chat_bot, app_session, params_form, num_beams, repetition_penalty, repetition_penalty_2, top_p, top_k, temperature], | 
					
						
						|  | [chat_bot, app_session] | 
					
						
						|  | ) | 
					
						
						|  | regenerate.click( | 
					
						
						|  | regenerate_button_clicked, | 
					
						
						|  | [txt_message, chat_bot, app_session], | 
					
						
						|  | [txt_message, chat_bot, app_session], | 
					
						
						|  | queue=False | 
					
						
						|  | ).then( | 
					
						
						|  | respond, | 
					
						
						|  | [chat_bot, app_session, params_form, num_beams, repetition_penalty, repetition_penalty_2, top_p, top_k, temperature], | 
					
						
						|  | [chat_bot, app_session] | 
					
						
						|  | ) | 
					
						
						|  | bt_pic.upload(lambda: None, None, chat_bot, queue=False).then(upload_img, inputs=[bt_pic,chat_bot,app_session], outputs=[chat_bot,app_session]) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | demo.queue() | 
					
						
						|  | demo.launch() |