File size: 8,172 Bytes
8d00201
 
 
 
 
 
 
 
 
 
 
6accf0d
 
 
 
8d00201
 
 
 
 
 
 
6accf0d
8d00201
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6accf0d
8d00201
 
 
 
 
 
 
6accf0d
8d00201
 
 
 
 
 
 
 
 
 
 
 
 
6accf0d
 
8d00201
 
 
6accf0d
8d00201
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6accf0d
8d00201
 
 
 
 
 
 
 
 
6accf0d
8d00201
 
 
 
 
6accf0d
 
8d00201
 
 
 
 
 
 
 
 
 
6accf0d
8d00201
 
 
6accf0d
 
 
 
 
 
 
8d00201
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6accf0d
 
 
 
 
8d00201
 
 
 
 
 
 
 
 
 
6accf0d
8d00201
6accf0d
8d00201
 
 
6accf0d
8d00201
6accf0d
8d00201
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
#!/usr/bin/env python

import gradio as gr
import os
import re
from PIL import Image
import base64
import time

DESCRIPTION = '''# <a href="https://github.com/THUDM/CogVLM">VisualGLM</a>'''

MAINTENANCE_NOTICE1 = 'Hint 1: If the app report "Something went wrong, connection error out", please turn off your proxy and retry.<br>Hint 2: If you upload a large size of image like 10MB, it may take some time to upload and process. Please be patient and wait.'

GROUNDING_NOTICE = 'Hint: When you check "Grounding", please use the <a href="https://github.com/THUDM/CogVLM/blob/main/utils/template.py#L344">corresponding prompt</a> or the examples below.'


NOTES = 'This app is adapted from <a href="https://github.com/THUDM/CogVLM">https://github.com/THUDM/CogVLM</a>. It would be recommended to check out the repo if you want to see the detail of our model.'

import json
import requests
import base64
import hashlib
from utils import parse_response

default_chatbox = [("", "Hi, What do you want to know about this image?")]

URL = os.environ.get("URL")

def process_image(image_prompt):
    image = Image.open(image_prompt)
    print(f"height:{image.height}, width:{image.width}")
    resized_image = image.resize((224, 224), )
    timestamp = int(time.time())
    file_ext = os.path.splitext(image_prompt)[1]
    filename = f"examples/{timestamp}{file_ext}"
    resized_image.save(filename)
    print(f"temporal filename {filename}")
    with open(filename, "rb") as image_file:
        bytes = base64.b64encode(image_file.read())
        encoded_img = str(bytes, encoding='utf-8')
        image_hash = hashlib.sha256(bytes).hexdigest()
        os.remove(filename)
        return encoded_img, image_hash


def process_image_without_resize(image_prompt):
    image = Image.open(image_prompt)
    print(f"height:{image.height}, width:{image.width}")
    timestamp = int(time.time())
    file_ext = os.path.splitext(image_prompt)[1]
    filename = f"examples/{timestamp}{file_ext}"
    filename_grounding = f"examples/{timestamp}_grounding{file_ext}"
    image.save(filename)
    print(f"temporal filename {filename}")
    with open(filename, "rb") as image_file:
        bytes = base64.b64encode(image_file.read())
        encoded_img = str(bytes, encoding='utf-8')
        image_hash = hashlib.sha256(bytes).hexdigest()
        os.remove(filename)
        return image, encoded_img, image_hash, filename_grounding


def is_chinese(text):
    zh_pattern = re.compile(u'[\u4e00-\u9fa5]+')
    return zh_pattern.search(text)


def post(
        input_text,
        temperature,
        top_p,
        image_prompt,
        result_previous,
        hidden_image,
        grounding    
        ):
    result_text = [(ele[0], ele[1]) for ele in result_previous]
    for i in range(len(result_text)-1, -1, -1):
        if result_text[i][0] == "" or result_text[i][0] == None:
            del result_text[i]
    print(f"history {result_text}")

    is_zh = is_chinese(input_text)

    if image_prompt is None:
        print("Image empty")
        if is_zh:
            result_text.append((input_text, '图片为空!请上传图片并重试。'))
        else:
            result_text.append((input_text, 'Image empty! Please upload a image and retry.'))
        return input_text, result_text, hidden_image
    elif input_text == "":
        print("Text empty")
        result_text.append((input_text, 'Text empty! Please enter text and retry.'))
        return "", result_text, hidden_image                

    headers = {
            "Content-Type": "application/json; charset=UTF-8",
            "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.87 Safari/537.36",
        }
    if image_prompt:
        pil_img, encoded_img, image_hash, image_path_grounding = process_image_without_resize(image_prompt)
        print(f"image_hash:{image_hash}, hidden_image_hash:{hidden_image}")

        if hidden_image is not None and image_hash != hidden_image:
            print("image has been update")
            result_text = []        
        hidden_image = image_hash        
    else:
        encoded_img = None

    print('request chat model...' if not grounding else 'request grounding model...')
    data = json.dumps({
        'text': input_text,
        'image': encoded_img,
        'temperature': temperature,
        'top_p': top_p,
        'history': result_text,
        'is_grounding': grounding
    })
    try:
        response = requests.request("POST", URL, headers=headers, data=data, timeout=(60, 100)).json()
    except Exception as e:
        print("error message", e)
        if is_zh:
            result_text.append((input_text, '超时!请稍等几分钟再重试。'))
        else:
            result_text.append((input_text, 'Timeout! Please wait a few minutes and retry.'))
        return "", result_text, hidden_image
    print('request done...')
    # response = {'result':input_text}

    answer = str(response['result'])
    if grounding:
        parse_response(pil_img, answer, image_path_grounding)
        new_answer = answer.replace(input_text, "")
        result_text.append((input_text, new_answer))
        result_text.append((None, (image_path_grounding,)))
    else:
        result_text.append((input_text, answer))
    print(result_text)
    print('finished')
    return "", result_text, hidden_image


def clear_fn(value):
    return "", default_chatbox, None

def clear_fn2(value):
    return default_chatbox


def main():
    gr.close_all()
    examples = []
    with open("./examples/example_inputs.jsonl") as f:
        for line in f:
            data = json.loads(line)
            examples.append(data)


    with gr.Blocks(css='style.css') as demo:

        with gr.Row():
            with gr.Column(scale=4.5):
                with gr.Group():
                    input_text = gr.Textbox(label='Input Text', placeholder='Please enter text prompt below and press ENTER.')
                    with gr.Row():
                        run_button = gr.Button('Generate')
                        clear_button = gr.Button('Clear')

                    image_prompt = gr.Image(type="filepath", label="Image Prompt", value=None)
                with gr.Row():
                    grounding = gr.Checkbox(label="Grounding")
                with gr.Row():
                    grounding_notice = gr.Markdown(GROUNDING_NOTICE)

                with gr.Row():
                    temperature = gr.Slider(maximum=1, value=0.8, minimum=0, label='Temperature')
                    top_p = gr.Slider(maximum=1, value=0.4, minimum=0, label='Top P')
            with gr.Column(scale=5.5):
                result_text = gr.components.Chatbot(label='Multi-round conversation History', value=[("", "Hi, What do you want to know about this image?")]).style(height=550)
                hidden_image_hash = gr.Textbox(visible=False)

        gr_examples = gr.Examples(examples=[[example["text"], example["image"]] for example in examples], 
                                  inputs=[input_text, image_prompt],
                                  label="Example Inputs (Click to insert an examplet into the input box)",
                                  examples_per_page=6)

        gr.Markdown(MAINTENANCE_NOTICE1)
        gr.Markdown(NOTES)

        print(gr.__version__)
        run_button.click(fn=post,inputs=[input_text, temperature, top_p, image_prompt, result_text, hidden_image_hash, grounding],
                         outputs=[input_text, result_text, hidden_image_hash])
        input_text.submit(fn=post,inputs=[input_text, temperature, top_p, image_prompt, result_text, hidden_image_hash, grounding],
                         outputs=[input_text, result_text, hidden_image_hash])
        clear_button.click(fn=clear_fn, inputs=clear_button, outputs=[input_text, result_text, image_prompt])
        image_prompt.upload(fn=clear_fn2, inputs=clear_button, outputs=[result_text])
        image_prompt.clear(fn=clear_fn2, inputs=clear_button, outputs=[result_text])

        print(gr.__version__)

    demo.queue(concurrency_count=10)
    demo.launch()

if __name__ == '__main__':
    main()