#!/usr/bin/env python
# encoding: utf-8
import spaces
import torch
@spaces.GPU
def debug():
torch.randn(10).cuda()
debug()
import argparse
from transformers import AutoModel, AutoTokenizer
import gradio as gr
from PIL import Image
from decord import VideoReader, cpu
import io
import os
os.system("nvidia-smi")
import copy
import requests
import base64
import json
import traceback
import re
import modelscope_studio as mgr
from modelscope.hub.snapshot_download import snapshot_download
model_dir = snapshot_download('iic/mPLUG-Owl3-7B-240728', cache_dir='./')
os.system('ls')
# README, How to run demo on different devices
# For Nvidia GPUs.
# python web_demo_2.6.py --device cuda
# For Mac with MPS (Apple silicon or AMD GPUs).
# PYTORCH_ENABLE_MPS_FALLBACK=1 python web_demo_2.6.py --device mps
# Argparser
parser = argparse.ArgumentParser(description='demo')
parser.add_argument('--device', type=str, default='cuda', help='cuda or mps')
parser.add_argument("--host", type=str, default="0.0.0.0")
parser.add_argument("--port", type=int)
args = parser.parse_args()
device = args.device
assert device in ['cuda', 'mps']
# Load model
model_path = './iic/mPLUG-Owl3-7B-240728'
if 'int4' in model_path:
if device == 'mps':
print('Error: running int4 model with bitsandbytes on Mac is not supported right now.')
exit()
model = AutoModel.from_pretrained(model_path, attn_implementation='sdpa', trust_remote_code=True)
else:
model = AutoModel.from_pretrained(model_path, attn_implementation='sdpa', trust_remote_code=True, torch_dtype=torch.bfloat16)
model = model.to(device=device)
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
model.eval()
ERROR_MSG = "Error, please retry"
model_name = 'mPLUG-Owl3'
MAX_NUM_FRAMES = 64
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff', '.webp'}
VIDEO_EXTENSIONS = {'.mp4', '.mkv', '.mov', '.avi', '.flv', '.wmv', '.webm', '.m4v'}
def get_file_extension(filename):
return os.path.splitext(filename)[1].lower()
def is_image(filename):
return get_file_extension(filename) in IMAGE_EXTENSIONS
def is_video(filename):
return get_file_extension(filename) in VIDEO_EXTENSIONS
form_radio = {
'choices': ['Beam Search', 'Sampling'],
#'value': 'Beam Search',
'value': 'Sampling',
'interactive': True,
'label': 'Decode Type'
}
def create_component(params, comp='Slider'):
if comp == 'Slider':
return gr.Slider(
minimum=params['minimum'],
maximum=params['maximum'],
value=params['value'],
step=params['step'],
interactive=params['interactive'],
label=params['label']
)
elif comp == 'Radio':
return gr.Radio(
choices=params['choices'],
value=params['value'],
interactive=params['interactive'],
label=params['label']
)
elif comp == 'Button':
return gr.Button(
value=params['value'],
interactive=True
)
def create_multimodal_input(upload_image_disabled=False, upload_video_disabled=False):
return mgr.MultimodalInput(upload_image_button_props={'label': 'Upload Image', 'disabled': upload_image_disabled, 'file_count': 'multiple'},
upload_video_button_props={'label': 'Upload Video', 'disabled': upload_video_disabled, 'file_count': 'single'},
submit_button_props={'label': 'Submit'})
@spaces.GPU
def chat(img, msgs, ctx, params=None, vision_hidden_states=None):
try:
print('msgs:', msgs)
images = []
videos = []
messages = []
for line in msgs:
s = ""
for item in line['content']:
if isinstance(item, str):
s+=item
else:
s+='<|image|>'
images.append(item)
messages.append({"role": line['role'], "content": s})
messages.append({"role": "assistant", "content": ""})
answer = model.chat(
images=images,
videos=videos,
messages=messages,
tokenizer=tokenizer,
**params
)
res = re.sub(r'(.*)', '', answer)
res = res.replace('[', '')
res = res.replace(']', '')
res = res.replace('', '')
answer = res.replace('', '')
print('answer:', answer)
return 0, answer, None, None
except Exception as e:
print(e)
traceback.print_exc()
return -1, ERROR_MSG, None, None
def encode_image(image):
if not isinstance(image, Image.Image):
if hasattr(image, 'path'):
image = Image.open(image.path).convert("RGB")
else:
image = Image.open(image.file.path).convert("RGB")
# resize to max_size
max_size = 448*16
if max(image.size) > max_size:
w,h = image.size
if w > h:
new_w = max_size
new_h = int(h * max_size / w)
else:
new_h = max_size
new_w = int(w * max_size / h)
image = image.resize((new_w, new_h), resample=Image.BICUBIC)
return image
## save by BytesIO and convert to base64
#buffered = io.BytesIO()
#image.save(buffered, format="png")
#im_b64 = base64.b64encode(buffered.getvalue()).decode()
#return {"type": "image", "pairs": im_b64}
def encode_video(video):
def uniform_sample(l, n):
gap = len(l) / n
idxs = [int(i * gap + gap / 2) for i in range(n)]
return [l[i] for i in idxs]
if hasattr(video, 'path'):
vr = VideoReader(video.path, ctx=cpu(0))
else:
vr = VideoReader(video.file.path, ctx=cpu(0))
sample_fps = round(vr.get_avg_fps() / 1) # FPS
frame_idx = [i for i in range(0, len(vr), sample_fps)]
if len(frame_idx)>MAX_NUM_FRAMES:
frame_idx = uniform_sample(frame_idx, MAX_NUM_FRAMES)
video = vr.get_batch(frame_idx).asnumpy()
video = [Image.fromarray(v.astype('uint8')) for v in video]
video = [encode_image(v) for v in video]
print('video frames:', len(video))
return video
def check_mm_type(mm_file):
if hasattr(mm_file, 'path'):
path = mm_file.path
else:
path = mm_file.file.path
if is_image(path):
return "image"
if is_video(path):
return "video"
return None
def encode_mm_file(mm_file):
if check_mm_type(mm_file) == 'image':
return [encode_image(mm_file)]
if check_mm_type(mm_file) == 'video':
return encode_video(mm_file)
return None
def make_text(text):
#return {"type": "text", "pairs": text} # # For remote call
return text
def encode_message(_question):
files = _question.files
question = _question.text
pattern = r"\[mm_media\]\d+\[/mm_media\]"
matches = re.split(pattern, question)
message = []
if len(matches) != len(files) + 1:
gr.Warning("Number of Images not match the placeholder in text, please refresh the page to restart!")
assert len(matches) == len(files) + 1
text = matches[0].strip()
if text:
message.append(make_text(text))
for i in range(len(files)):
message += encode_mm_file(files[i])
text = matches[i + 1].strip()
if text:
message.append(make_text(text))
return message
def check_has_videos(_question):
images_cnt = 0
videos_cnt = 0
for file in _question.files:
if check_mm_type(file) == "image":
images_cnt += 1
else:
videos_cnt += 1
return images_cnt, videos_cnt
def count_video_frames(_context):
num_frames = 0
for message in _context:
for item in message["content"]:
#if item["type"] == "image": # For remote call
if isinstance(item, Image.Image):
num_frames += 1
return num_frames
def respond(_question, _chat_bot, _app_cfg, params_form):
_context = _app_cfg['ctx'].copy()
_context.append({'role': 'user', 'content': encode_message(_question)})
images_cnt = _app_cfg['images_cnt']
videos_cnt = _app_cfg['videos_cnt']
files_cnts = check_has_videos(_question)
if files_cnts[1] + videos_cnt > 1 or (files_cnts[1] + videos_cnt == 1 and files_cnts[0] + images_cnt > 0):
gr.Warning("Only supports single video file input right now!")
return _question, _chat_bot, _app_cfg
if params_form == 'Beam Search':
params = {
'sampling': False,
'num_beams': 3,
'repetition_penalty': 1.2,
"max_new_tokens": 2048
}
else:
params = {
'sampling': True,
'top_p': 0.8,
'top_k': 100,
'temperature': 0.7,
'repetition_penalty': 1.05,
"max_new_tokens": 2048
}
if files_cnts[1] + videos_cnt > 0:
params["max_inp_length"] = 4352 # 4096+256
params["use_image_id"] = False
params["max_slice_nums"] = 1 if count_video_frames(_context) > 16 else 2
code, _answer, _, sts = chat("", _context, None, params)
images_cnt += files_cnts[0]
videos_cnt += files_cnts[1]
_context.append({"role": "assistant", "content": [make_text(_answer)]})
_chat_bot.append((_question, _answer))
if code == 0:
_app_cfg['ctx']=_context
_app_cfg['sts']=sts
_app_cfg['images_cnt'] = images_cnt
_app_cfg['videos_cnt'] = videos_cnt
upload_image_disabled = videos_cnt > 0
upload_video_disabled = videos_cnt > 0 or images_cnt > 0
return create_multimodal_input(upload_image_disabled, upload_video_disabled), _chat_bot, _app_cfg
def fewshot_add_demonstration(_image, _user_message, _assistant_message, _chat_bot, _app_cfg):
ctx = _app_cfg["ctx"]
message_item = []
if _image is not None:
image = Image.open(_image).convert("RGB")
ctx.append({"role": "user", "content": [encode_image(image), make_text(_user_message)]})
message_item.append({"text": "[mm_media]1[/mm_media]" + _user_message, "files": [_image]})
else:
if _user_message:
ctx.append({"role": "user", "content": [make_text(_user_message)]})
message_item.append({"text": _user_message, "files": []})
else:
message_item.append(None)
if _assistant_message:
ctx.append({"role": "assistant", "content": [make_text(_assistant_message)]})
message_item.append({"text": _assistant_message, "files": []})
else:
message_item.append(None)
_chat_bot.append(message_item)
return None, "", "", _chat_bot, _app_cfg
def fewshot_respond(_image, _user_message, _chat_bot, _app_cfg, params_form):
user_message_contents = []
_context = _app_cfg["ctx"].copy()
if _image:
image = Image.open(_image).convert("RGB")
user_message_contents += [encode_image(image)]
if _user_message:
user_message_contents += [make_text(_user_message)]
if user_message_contents:
_context.append({"role": "user", "content": user_message_contents})
if params_form == 'Beam Search':
params = {
'sampling': False,
'num_beams': 3,
'repetition_penalty': 1.2,
"max_new_tokens": 2048
}
else:
params = {
'sampling': True,
'top_p': 0.8,
'top_k': 100,
'temperature': 0.7,
'repetition_penalty': 1.05,
"max_new_tokens": 2048
}
code, _answer, _, sts = chat("", _context, None, params)
_context.append({"role": "assistant", "content": [make_text(_answer)]})
if _image:
_chat_bot.append([
{"text": "[mm_media]1[/mm_media]" + _user_message, "files": [_image]},
{"text": _answer, "files": []}
])
else:
_chat_bot.append([
{"text": _user_message, "files": [_image]},
{"text": _answer, "files": []}
])
if code == 0:
_app_cfg['ctx']=_context
_app_cfg['sts']=sts
return None, '', '', _chat_bot, _app_cfg
def regenerate_button_clicked(_question, _image, _user_message, _assistant_message, _chat_bot, _app_cfg, params_form):
if len(_chat_bot) <= 1 or not _chat_bot[-1][1]:
gr.Warning('No question for regeneration.')
return '', _image, _user_message, _assistant_message, _chat_bot, _app_cfg
if _app_cfg["chat_type"] == "Chat":
images_cnt = _app_cfg['images_cnt']
videos_cnt = _app_cfg['videos_cnt']
_question = _chat_bot[-1][0]
_chat_bot = _chat_bot[:-1]
_app_cfg['ctx'] = _app_cfg['ctx'][:-2]
files_cnts = check_has_videos(_question)
images_cnt -= files_cnts[0]
videos_cnt -= files_cnts[1]
_app_cfg['images_cnt'] = images_cnt
_app_cfg['videos_cnt'] = videos_cnt
upload_image_disabled = videos_cnt > 0
upload_video_disabled = videos_cnt > 0 or images_cnt > 0
_question, _chat_bot, _app_cfg = respond(_question, _chat_bot, _app_cfg, params_form)
return _question, _image, _user_message, _assistant_message, _chat_bot, _app_cfg
else:
last_message = _chat_bot[-1][0]
last_image = None
last_user_message = ''
if last_message.text:
last_user_message = last_message.text
if last_message.files:
last_image = last_message.files[0].file.path
_chat_bot = _chat_bot[:-1]
_app_cfg['ctx'] = _app_cfg['ctx'][:-2]
_image, _user_message, _assistant_message, _chat_bot, _app_cfg = fewshot_respond(last_image, last_user_message, _chat_bot, _app_cfg, params_form)
return _question, _image, _user_message, _assistant_message, _chat_bot, _app_cfg
def flushed():
return gr.update(interactive=True)
def clear(txt_message, chat_bot, app_session):
txt_message.files.clear()
txt_message.text = ''
chat_bot = copy.deepcopy(init_conversation)
app_session['sts'] = None
app_session['ctx'] = []
app_session['images_cnt'] = 0
app_session['videos_cnt'] = 0
return create_multimodal_input(), chat_bot, app_session, None, '', ''
def select_chat_type(_tab, _app_cfg):
_app_cfg["chat_type"] = _tab
return _app_cfg
init_conversation = [
[
None,
{
# The first message of bot closes the typewriter.
"text": "You can talk to me now",
"flushing": False
}
],
]
css = """
video { height: auto !important; }
.example label { font-size: 16px;}
"""
introduction = """
##
Github:
[mPLUG-Owl](https://github.com/X-PLUG/mPLUG-Owl)
Checkpoint:
[mPLUG-Owl3-7B-240728](https://huggingface.co/mPLUG/mPLUG-Owl3-7B-240728)
Paper:
[mPLUG-Owl3: Towards Long Image-Sequence Understanding in Multi-Modal Large Language Models
](https://arxiv.org/abs/2408.04840)
"""
with gr.Blocks(css=css) as demo:
with gr.Tab(model_name):
with gr.Row():
with gr.Column(scale=1, min_width=300):
gr.Markdown(value=introduction)
params_form = create_component(form_radio, comp='Radio')
regenerate = create_component({'value': 'Regenerate'}, comp='Button')
clear_button = create_component({'value': 'Clear History'}, comp='Button')
with gr.Column(scale=3, min_width=500):
app_session = gr.State({'sts':None,'ctx':[], 'images_cnt': 0, 'videos_cnt': 0, 'chat_type': 'Chat'})
chat_bot = mgr.Chatbot(label=f"Chat with {model_name}", value=copy.deepcopy(init_conversation), height=600, flushing=False, bubble_full_width=False)
with gr.Tab("Chat") as chat_tab:
txt_message = create_multimodal_input()
chat_tab_label = gr.Textbox(value="Chat", interactive=False, visible=False)
txt_message.submit(
respond,
[txt_message, chat_bot, app_session, params_form],
[txt_message, chat_bot, app_session]
)
with gr.Tab("Few Shot", visible=False) as fewshot_tab:
fewshot_tab_label = gr.Textbox(value="Few Shot", interactive=False, visible=False)
with gr.Row():
with gr.Column(scale=1):
image_input = gr.Image(type="filepath", sources=["upload"])
with gr.Column(scale=3):
user_message = gr.Textbox(label="User")
assistant_message = gr.Textbox(label="Assistant")
with gr.Row():
add_demonstration_button = gr.Button("Add Example")
generate_button = gr.Button(value="Generate", variant="primary")
add_demonstration_button.click(
fewshot_add_demonstration,
[image_input, user_message, assistant_message, chat_bot, app_session],
[image_input, user_message, assistant_message, chat_bot, app_session]
)
generate_button.click(
fewshot_respond,
[image_input, user_message, chat_bot, app_session, params_form],
[image_input, user_message, assistant_message, chat_bot, app_session]
)
chat_tab.select(
select_chat_type,
[chat_tab_label, app_session],
[app_session]
)
chat_tab.select( # do clear
clear,
[txt_message, chat_bot, app_session],
[txt_message, chat_bot, app_session, image_input, user_message, assistant_message]
)
fewshot_tab.select(
select_chat_type,
[fewshot_tab_label, app_session],
[app_session]
)
fewshot_tab.select( # do clear
clear,
[txt_message, chat_bot, app_session],
[txt_message, chat_bot, app_session, image_input, user_message, assistant_message]
)
chat_bot.flushed(
flushed,
outputs=[txt_message]
)
regenerate.click(
regenerate_button_clicked,
[txt_message, image_input, user_message, assistant_message, chat_bot, app_session, params_form],
[txt_message, image_input, user_message, assistant_message, chat_bot, app_session]
)
clear_button.click(
clear,
[txt_message, chat_bot, app_session],
[txt_message, chat_bot, app_session, image_input, user_message, assistant_message]
)
# launch
demo.launch(share=False, debug=True, show_api=False, server_port=args.port, server_name=args.host)