File size: 4,538 Bytes
59e8091
 
 
 
 
 
 
 
99ad72b
 
 
59e8091
 
 
 
 
 
99ad72b
 
 
 
 
59e8091
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99ad72b
 
 
 
 
 
 
 
 
 
59e8091
 
 
 
 
 
 
99ad72b
59e8091
 
 
 
 
99ad72b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59e8091
99ad72b
 
 
 
 
59e8091
 
99ad72b
59e8091
 
 
 
 
 
 
 
 
 
 
 
99ad72b
59e8091
 
 
 
 
99ad72b
 
 
 
 
 
 
 
 
59e8091
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import gradio as gr
from transformers import LlavaProcessor, LlavaForConditionalGeneration, TextIteratorStreamer
from threading import Thread
import re
import time 
from PIL import Image
import torch
import cv2
import spaces

model_id = "llava-hf/llava-interleave-qwen-0.5b-hf"

processor = LlavaProcessor.from_pretrained(model_id)

model = LlavaForConditionalGeneration.from_pretrained(model_id, torch_dtype=torch.float16)
model.to("cuda")


def replace_video_with_images(text, frames):
  return text.replace("<video>", "<image>" * frames)

def sample_frames(video_file, num_frames):
    video = cv2.VideoCapture(video_file)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    interval = total_frames // num_frames
    frames = []
    for i in range(total_frames):
        ret, frame = video.read()
        pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if not ret:
            continue
        if i % interval == 0:
            frames.append(pil_img)
    video.release()
    return frames

@spaces.GPU
def bot_streaming(message, history):

  txt = message.text
  ext_buffer = f"user\n{txt} assistant"

  if message.files:
    if len(message.files) == 1:
      image = [message.files[0].path]
    # interleaved images or video
    elif len(message.files) > 1:
      image = [msg.path for msg in message.files]
  else:
    # if there's no image uploaded for this turn, look for images in the past turns
    # kept inside tuples, take the last one
    for hist in history:
      if type(hist[0])==tuple:
        image = hist[0][0]

  if message.files is None:
      gr.Error("You need to upload an image or video for LLaVA to work.")
      
  video_extensions = ("avi", "mp4", "mov", "mkv", "flv", "wmv", "mjpeg")
  image_extensions = Image.registered_extensions()
  image_extensions = tuple([ex for ex, f in image_extensions.items()])
  if len(image) == 1:
    if image[0].endswith(video_extensions):
        
        image = sample_frames(image[0], 12)
        image_tokens = "<image>" * 13 
        prompt = f"<|im_start|>user {image_tokens}\n{message.text}<|im_end|><|im_start|>assistant"
    elif image[0].endswith(image_extensions):
        image = Image.open(image[0]).convert("RGB")
        prompt = f"<|im_start|>user <image>\n{message.text}<|im_end|><|im_start|>assistant"

  elif len(image) > 1:
    image_list = []
    user_prompt = message.text

    for img in image:
      if img.endswith(image_extensions):
        img = Image.open(img).convert("RGB")
        image_list.append(img)

      elif img.endswith(video_extensions):        
        frames = sample_frames(img, 6)
        for frame in frames:
          image_list.append(frame)
      
    toks = "<image>" * len(image_list)
    prompt = "<|im_start|>user"+ toks + f"\n{user_prompt}<|im_end|><|im_start|>assistant"

    image = image_list


  inputs = processor(prompt, image, return_tensors="pt").to("cuda", torch.float16)
  streamer = TextIteratorStreamer(processor, **{"max_new_tokens": 200, "skip_special_tokens": True})
  generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=100)
  generated_text = ""

  thread = Thread(target=model.generate, kwargs=generation_kwargs)
  thread.start()

  

  buffer = ""
  for new_text in streamer:
    
    buffer += new_text
    
    generated_text_without_prompt = buffer[len(ext_buffer):]
    time.sleep(0.01)
    yield generated_text_without_prompt


demo = gr.ChatInterface(fn=bot_streaming, title="LLaVA Interleave", examples=[
     {"text": "What are these cats doing?", "files":["./cats.mp4"]}, 
     {"text": "The input contains two videos, are the cats in this video and this video doing the same thing?", "files":["./cats_1.mp4", "./cats_2.mp4"]},
     {"text": "What is on the flower?", "files":["./bee.jpg"]},
     {"text": "There are two images in the input. What is the relationship between this image and this image?", "files":["./bee.jpg", "./depth-bee.png"]},
    {"text": "How to make this pastry?", "files":["./baklava.png"]}], 
      textbox=gr.MultimodalTextbox(file_count="multiple"), 
      description="Try [LLaVA Interleave](https://huggingface.co/docs/transformers/main/en/model_doc/llava) in this demo (more specifically, the [Qwen-1.5-7B variant](https://huggingface.co/llava-hf/llava-interleave-qwen-7b-hf)). Upload an image or a video, and start chatting about it, or simply try one of the examples below. If you don't upload an image, you will receive an error. ",
      stop_btn="Stop Generation", multimodal=True)
demo.launch(debug=True)