Spaces:
Running
Running
| # Copyright (c) Kuaishou. | |
| # | |
| # This source code is licensed under the license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import os | |
| import base64 | |
| import numpy as np | |
| os.system('pip install huggingface_hub gradio openai keye_vl_utils torch torchvision -U') | |
| from argparse import ArgumentParser | |
| from pathlib import Path | |
| import copy | |
| import gradio as gr | |
| import os | |
| import re | |
| import tempfile | |
| from openai import OpenAI | |
| from PIL import Image | |
| from io import BytesIO | |
| from keye_vl_utils import fetch_video, process_vision_info | |
| def _get_args(): | |
| parser = ArgumentParser() | |
| parser.add_argument("--share", action="store_true", default=False, | |
| help="Create a publicly shareable link for the interface.") | |
| parser.add_argument("--inbrowser", action="store_true", default=False, | |
| help="Automatically launch the interface in a new tab on the default browser.") | |
| parser.add_argument("--server-port", type=int, default=7860, | |
| help="Demo server port.") | |
| parser.add_argument("--server-name", type=str, default="127.0.0.1", | |
| help="Demo server name.") | |
| args = parser.parse_args() | |
| return args | |
| def _parse_text(text): | |
| lines = text.split("\n") | |
| lines = [line for line in lines if line != ""] | |
| count = 0 | |
| for i, line in enumerate(lines): | |
| line = line.replace("<think>", "**ζθθΏη¨εΌε§**οΌ\n") | |
| line = line.replace("</think>", "\n**ζθθΏη¨η»ζ**\n") | |
| line = line.replace("<answer>", "**εηεΌε§**οΌ\n") | |
| line = line.replace("</answer>", "\n**εηη»ζ**\n") | |
| line = line.replace("<analysis>", "**εζεΌε§**οΌ\n") | |
| line = line.replace("</analysis>", "\n**εζη»ζ**\n") | |
| if "```" in line: | |
| count += 1 | |
| items = line.split("`") | |
| if count % 2 == 1: | |
| lines[i] = f'<pre><code class="language-{items[-1]}">' | |
| else: | |
| lines[i] = f"<br></code></pre>" | |
| else: | |
| if True or i > 0: | |
| if count % 2 == 1: | |
| line = line.replace("`", r"\`") | |
| line = line.replace("<", "<") | |
| line = line.replace(">", ">") | |
| line = line.replace(" ", " ") | |
| line = line.replace("*", "*") | |
| line = line.replace("_", "_") | |
| line = line.replace("-", "-") | |
| line = line.replace(".", ".") | |
| line = line.replace("!", "!") | |
| line = line.replace("(", "(") | |
| line = line.replace(")", ")") | |
| line = line.replace("$", "$") | |
| lines[i] = "<br>" + line | |
| text = "".join(lines) | |
| return text | |
| def is_video_file(filename): | |
| video_extensions = ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.mpeg'] | |
| return any(filename.lower().endswith(ext) for ext in video_extensions) | |
| def video_processor(video_path): | |
| video_inputs, video_sample_fps = fetch_video({"video": video_path, "max_frames": 8, "max_pixels": 256*28*28}, return_video_sample_fps=True) | |
| video_input = video_inputs.permute(0, 2, 3, 1).numpy().astype(np.uint8) | |
| # encode image with base64 | |
| base64_frames = [] | |
| for frame in video_input: | |
| img = Image.fromarray(frame) | |
| output_buffer = BytesIO() | |
| img.save(output_buffer, format="jpeg") | |
| byte_data = output_buffer.getvalue() | |
| base64_str = base64.b64encode(byte_data).decode("utf-8") | |
| base64_frames.append(base64_str) | |
| video_info = { | |
| "type": "video_url", | |
| "video_url": {"url": f"data:video/jpeg;base64,{','.join(base64_frames)}"} | |
| } | |
| return video_info | |
| def _launch_demo(args): | |
| EP_URL = os.getenv("ENDPOINT_URL") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| client = OpenAI( | |
| base_url=f"{EP_URL}/v1/", | |
| api_key=HF_TOKEN | |
| ) | |
| def predict(_chatbot, task_history): | |
| chat_query = _chatbot[-1][0] | |
| query = task_history[-1][0] | |
| if len(chat_query) == 0: | |
| _chatbot.pop() | |
| task_history.pop() | |
| return _chatbot | |
| print("User: " + _parse_text(query)) | |
| history_cp = copy.deepcopy(task_history) | |
| full_response = "" | |
| messages = [] | |
| content = [] | |
| for q, a in history_cp: | |
| if isinstance(q, (tuple, list)): | |
| if is_video_file(q[0]): | |
| video_info = video_processor(q[0]) | |
| content.append(video_info) | |
| else: | |
| # convert image to base64 | |
| with open(q[0], 'rb') as img_file: | |
| img_base64 = base64.b64encode(img_file.read()).decode('utf-8') | |
| content.append({'type': 'image_url', 'image_url': {"url": f"data:image/jpeg;base64,{img_base64}"}}) | |
| else: | |
| content.append({'type': 'text', 'text': q}) | |
| messages.append({'role': 'user', 'content': content}) | |
| messages.append({'role': 'assistant', 'content': [{'type': 'text', 'text': a}]}) | |
| content = [] | |
| messages.pop() | |
| responses = client.chat.completions.create( | |
| model="Kwai-Keye/Keye-VL-1_5-8B", | |
| messages=messages, | |
| top_p=0.95, | |
| temperature=0.6, | |
| stream=True, | |
| timeout=360 | |
| ) | |
| response_text = "" | |
| for response in responses: | |
| response = response.choices[0].delta.content | |
| response_text += response | |
| _chatbot[-1] = (chat_query, _parse_text(response_text)) | |
| yield _chatbot | |
| response = response_text | |
| _chatbot[-1] = (chat_query, _parse_text(response)) | |
| full_response = _parse_text(response) | |
| task_history[-1] = (query, full_response) | |
| print("Kwai-Keye-VL-1_5-8B-Chat: " + full_response) | |
| yield _chatbot | |
| def regenerate(_chatbot, task_history): | |
| if not task_history: | |
| return _chatbot | |
| item = task_history[-1] | |
| if item[1] is None: | |
| return _chatbot | |
| task_history[-1] = (item[0], None) | |
| chatbot_item = _chatbot.pop(-1) | |
| if chatbot_item[0] is None: | |
| _chatbot[-1] = (_chatbot[-1][0], None) | |
| else: | |
| _chatbot.append((chatbot_item[0], None)) | |
| _chatbot_gen = predict(_chatbot, task_history) | |
| for _chatbot in _chatbot_gen: | |
| yield _chatbot | |
| def add_text(history, task_history, text): | |
| task_text = text | |
| history = history if history is not None else [] | |
| task_history = task_history if task_history is not None else [] | |
| history = history + [(_parse_text(text), None)] | |
| task_history = task_history + [(task_text, None)] | |
| return history, task_history, "" | |
| def add_file(history, task_history, file): | |
| history = history if history is not None else [] | |
| task_history = task_history if task_history is not None else [] | |
| history = history + [((file.name,), None)] | |
| task_history = task_history + [((file.name,), None)] | |
| return history, task_history | |
| def reset_user_input(): | |
| return gr.update(value="") | |
| def reset_state(task_history): | |
| task_history.clear() | |
| return [] | |
| with gr.Blocks() as demo: | |
| gr.Markdown("""<center><font size=3> Kwai-Keye-VL-1_5-8B Demo </center>""") | |
| chatbot = gr.Chatbot(label='Kwai-Keye-VL-1_5-8B', elem_classes="control-height", height=500) | |
| query = gr.Textbox(lines=2, label='Input') | |
| task_history = gr.State([]) | |
| with gr.Row(): | |
| addfile_btn = gr.UploadButton("π Upload (δΈδΌ ζδ»Ά)", file_types=["image", "video"]) | |
| submit_btn = gr.Button("π Submit (ει)") | |
| regen_btn = gr.Button("π€οΈ Regenerate (ιθ―)") | |
| empty_bin = gr.Button("π§Ή Clear History (ζΈ ι€εε²)") | |
| submit_btn.click(add_text, [chatbot, task_history, query], [chatbot, task_history]).then( | |
| predict, [chatbot, task_history], [chatbot], show_progress=True | |
| ) | |
| submit_btn.click(reset_user_input, [], [query]) | |
| empty_bin.click(reset_state, [task_history], [chatbot], show_progress=True) | |
| regen_btn.click(regenerate, [chatbot, task_history], [chatbot], show_progress=True) | |
| addfile_btn.upload(add_file, [chatbot, task_history, addfile_btn], [chatbot, task_history], show_progress=True) | |
| demo.queue(default_concurrency_limit=40).launch( | |
| share=args.share, | |
| ) | |
| def main(): | |
| args = _get_args() | |
| _launch_demo(args) | |
| if __name__ == '__main__': | |
| main() |