|
import re |
|
import io |
|
import os |
|
from typing import Optional, Tuple |
|
import datetime |
|
import sys |
|
import gradio as gr |
|
import requests |
|
import json |
|
from threading import Lock |
|
from langchain import ConversationChain, LLMChain |
|
from langchain.agents import load_tools, initialize_agent, Tool |
|
from langchain.tools.bing_search.tool import BingSearchRun, BingSearchAPIWrapper |
|
from langchain.chains.conversation.memory import ConversationBufferMemory |
|
from langchain.llms import OpenAI |
|
from langchain.chains import PALChain |
|
from langchain.llms import AzureOpenAI |
|
from langchain.utilities import ImunAPIWrapper, ImunMultiAPIWrapper |
|
from langchain.utils import get_url_path |
|
from openai.error import AuthenticationError, InvalidRequestError, RateLimitError |
|
import argparse |
|
import logging |
|
from opencensus.ext.azure.log_exporter import AzureLogHandler |
|
import uuid |
|
|
|
logger = None |
|
|
|
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") |
|
BUG_FOUND_MSG = "Some Functionalities not supported yet. Please refresh and hit 'Click to wake up MM-REACT'" |
|
AUTH_ERR_MSG = "OpenAI key needed" |
|
REFRESH_MSG = "Please refresh and hit 'Click to wake up MM-REACT'" |
|
MAX_TOKENS = 512 |
|
|
|
|
|
|
|
AGRS = None |
|
|
|
|
|
|
|
def get_logger(): |
|
global logger |
|
if logger is None: |
|
logger = logging.getLogger(__name__) |
|
logger.addHandler(AzureLogHandler()) |
|
return logger |
|
|
|
|
|
|
|
def load_chain(history, log_state): |
|
global ARGS |
|
|
|
if ARGS.openAIModel == 'openAIGPT35': |
|
|
|
llm = OpenAI(temperature=0, max_tokens=MAX_TOKENS) |
|
elif ARGS.openAIModel == 'azureChatGPT': |
|
|
|
llm = AzureOpenAI(deployment_name="text-chat-davinci-002", model_name="text-chat-davinci-002", temperature=0, max_tokens=MAX_TOKENS) |
|
elif ARGS.openAIModel == 'azureGPT35turbo': |
|
|
|
llm = AzureOpenAI(deployment_name="gpt-35-turbo-version-0301", model_name="gpt-35-turbo (version 0301)", temperature=0, max_tokens=MAX_TOKENS) |
|
elif ARGS.openAIModel == 'azureTextDavinci003': |
|
|
|
llm = AzureOpenAI(deployment_name="text-davinci-003", model_name="text-davinci-003", temperature=0, max_tokens=MAX_TOKENS) |
|
|
|
memory = ConversationBufferMemory(memory_key="chat_history") |
|
|
|
|
|
|
|
|
|
|
|
imun_dense = ImunAPIWrapper( |
|
imun_url=os.environ.get("IMUN_URL2"), |
|
params=os.environ.get("IMUN_PARAMS2"), |
|
imun_subscription_key=os.environ.get("IMUN_SUBSCRIPTION_KEY2")) |
|
|
|
imun = ImunAPIWrapper() |
|
imun = ImunMultiAPIWrapper(imuns=[imun, imun_dense]) |
|
|
|
imun_celeb = ImunAPIWrapper( |
|
imun_url=os.environ.get("IMUN_CELEB_URL"), |
|
params="") |
|
|
|
imun_read = ImunAPIWrapper( |
|
imun_url=os.environ.get("IMUN_OCR_READ_URL"), |
|
params=os.environ.get("IMUN_OCR_PARAMS"), |
|
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) |
|
|
|
imun_receipt = ImunAPIWrapper( |
|
imun_url=os.environ.get("IMUN_OCR_RECEIPT_URL"), |
|
params=os.environ.get("IMUN_OCR_PARAMS"), |
|
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) |
|
|
|
imun_businesscard = ImunAPIWrapper( |
|
imun_url=os.environ.get("IMUN_OCR_BC_URL"), |
|
params=os.environ.get("IMUN_OCR_PARAMS"), |
|
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) |
|
|
|
imun_layout = ImunAPIWrapper( |
|
imun_url=os.environ.get("IMUN_OCR_LAYOUT_URL"), |
|
params=os.environ.get("IMUN_OCR_PARAMS"), |
|
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) |
|
|
|
imun_invoice = ImunAPIWrapper( |
|
imun_url=os.environ.get("IMUN_OCR_INVOICE_URL"), |
|
params=os.environ.get("IMUN_OCR_PARAMS"), |
|
imun_subscription_key=os.environ.get("IMUN_OCR_SUBSCRIPTION_KEY")) |
|
|
|
bing = BingSearchAPIWrapper(k=2) |
|
|
|
def edit_photo(query: str) -> str: |
|
endpoint = os.environ.get("PHOTO_EDIT_ENDPOINT_URL") |
|
query = query.strip() |
|
url_idx, img_url = get_url_path(query) |
|
if not img_url.startswith(("http://", "https://")): |
|
return "Invalid image URL" |
|
img_url = img_url.replace("0.0.0.0", os.environ.get("PHOTO_EDIT_ENDPOINT_URL_SHORT")) |
|
instruction = query[:url_idx] |
|
|
|
job = {"image_path": img_url, "instruction": instruction} |
|
response = requests.post(endpoint, json=job) |
|
if response.status_code != 200: |
|
return "Could not finish the task try again later!" |
|
return "Here is the edited image " + endpoint + response.json()["edited_image"] |
|
|
|
|
|
tools = [ |
|
Tool( |
|
name="PAL-MATH", |
|
func=PALChain.from_math_prompt(llm).run, |
|
description=( |
|
"A wrapper around calculator. " |
|
"A language model that is really good at solving complex word math problems." |
|
"Input should be a fully worded hard word math problem." |
|
) |
|
), |
|
Tool( |
|
name = "Image Understanding", |
|
func=imun.run, |
|
description=( |
|
"A wrapper around Image Understanding. " |
|
"Useful for when you need to understand what is inside an image (objects, texts, people)." |
|
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." |
|
) |
|
), |
|
Tool( |
|
name = "OCR Understanding", |
|
func=imun_read.run, |
|
description=( |
|
"A wrapper around OCR Understanding (Optical Character Recognition). " |
|
"Useful after Image Understanding tool has found text or handwriting is present in the image tags." |
|
"This tool can find the actual text, written name, or product name in the image." |
|
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." |
|
) |
|
), |
|
Tool( |
|
name = "Receipt Understanding", |
|
func=imun_receipt.run, |
|
description=( |
|
"A wrapper receipt understanding. " |
|
"Useful after Image Understanding tool has recognized a receipt in the image tags." |
|
"This tool can find the actual receipt text, prices and detailed items." |
|
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." |
|
) |
|
), |
|
Tool( |
|
name = "Business Card Understanding", |
|
func=imun_businesscard.run, |
|
description=( |
|
"A wrapper around business card understanding. " |
|
"Useful after Image Understanding tool has recognized businesscard in the image tags." |
|
"This tool can find the actual business card text, name, address, email, website on the card." |
|
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." |
|
) |
|
), |
|
Tool( |
|
name = "Layout Understanding", |
|
func=imun_layout.run, |
|
description=( |
|
"A wrapper around layout and table understanding. " |
|
"Useful after Image Understanding tool has recognized businesscard in the image tags." |
|
"This tool can find the actual business card text, name, address, email, website on the card." |
|
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." |
|
) |
|
), |
|
Tool( |
|
name = "Invoice Understanding", |
|
func=imun_invoice.run, |
|
description=( |
|
"A wrapper around invoice understanding. " |
|
"Useful after Image Understanding tool has recognized businesscard in the image tags." |
|
"This tool can find the actual business card text, name, address, email, website on the card." |
|
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." |
|
) |
|
), |
|
Tool( |
|
name = "Celebrity Understanding", |
|
func=imun_celeb.run, |
|
description=( |
|
"A wrapper around celebrity understanding. " |
|
"Useful after Image Understanding tool has recognized people in the image tags that could be celebrities." |
|
"This tool can find the name of celebrities in the image." |
|
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." |
|
) |
|
), |
|
BingSearchRun(api_wrapper=bing), |
|
Tool( |
|
name = "Photo Editing", |
|
func=edit_photo, |
|
description=( |
|
"A wrapper around photo editing. " |
|
"Useful to edit an image with a given instruction." |
|
"Input should be an image url, or path to an image file (e.g. .jpg, .png)." |
|
) |
|
), |
|
] |
|
|
|
chain = initialize_agent(tools, llm, agent="conversational-assistant", verbose=True, memory=memory, return_intermediate_steps=True, max_iterations=4) |
|
log_state = log_state or "" |
|
print ("log_state {}".format(log_state)) |
|
log_state = str(uuid.uuid1()) |
|
print("langchain reloaded") |
|
|
|
properties = {'custom_dimensions': {'session': log_state}} |
|
get_logger().warning("langchain reloaded", extra=properties) |
|
history = [] |
|
history.append(("Show me what you got!", "Hi Human, Please upload an image to get started!")) |
|
|
|
return history, history, chain, log_state, \ |
|
gr.Textbox.update(visible=True), \ |
|
gr.Button.update(visible=True), \ |
|
gr.UploadButton.update(visible=True), \ |
|
gr.Row.update(visible=True), \ |
|
gr.HTML.update(visible=True), \ |
|
gr.Button.update(variant="secondary") |
|
|
|
|
|
|
|
def run_chain(chain, inp): |
|
|
|
|
|
output = "" |
|
try: |
|
output = chain.conversation(input=inp, keep_short=ARGS.noIntermediateConv) |
|
|
|
except AuthenticationError as ae: |
|
output = AUTH_ERR_MSG + str(datetime.datetime.now()) + ". " + str(ae) |
|
print("output", output) |
|
except RateLimitError as rle: |
|
output = "\n\nRateLimitError: " + str(rle) |
|
except ValueError as ve: |
|
output = "\n\nValueError: " + str(ve) |
|
except InvalidRequestError as ire: |
|
output = "\n\nInvalidRequestError: " + str(ire) |
|
except Exception as e: |
|
output = "\n\n" + BUG_FOUND_MSG + ":\n\n" + str(e) |
|
|
|
return output |
|
|
|
|
|
class ChatWrapper: |
|
|
|
def __init__(self): |
|
self.lock = Lock() |
|
|
|
def __call__( |
|
self, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain], log_state |
|
): |
|
|
|
"""Execute the chat functionality.""" |
|
self.lock.acquire() |
|
try: |
|
print("\n==== date/time: " + str(datetime.datetime.now()) + " ====") |
|
print("inp: " + inp) |
|
|
|
properties = {'custom_dimensions': {'session': log_state}} |
|
get_logger().warning("inp: " + inp, extra=properties) |
|
|
|
|
|
history = history or [] |
|
|
|
output = "Please paste your OpenAI key from openai.com to use this app. " + str(datetime.datetime.now()) |
|
|
|
|
|
|
|
outputs = run_chain(chain, inp) |
|
|
|
outputs = process_chain_output(outputs) |
|
|
|
print (" len(outputs) {}".format(len(outputs))) |
|
for i, output in enumerate(outputs): |
|
if i==0: |
|
history.append((inp, output)) |
|
else: |
|
history.append((None, output)) |
|
|
|
|
|
except Exception as e: |
|
raise e |
|
finally: |
|
self.lock.release() |
|
|
|
print (history) |
|
properties = {'custom_dimensions': {'session': log_state}} |
|
if outputs is None: |
|
outputs = "" |
|
get_logger().warning(str(json.dumps(outputs)), extra=properties) |
|
|
|
return history, history, "" |
|
|
|
def add_image_with_path(state, chain, imagepath, log_state): |
|
global ARGS |
|
state = state or [] |
|
|
|
url_input_for_chain = "http://0.0.0.0:{}/file={}".format(ARGS.port, imagepath) |
|
|
|
outputs = run_chain(chain, url_input_for_chain) |
|
|
|
|
|
|
|
outputs = process_chain_output(outputs) |
|
|
|
for i, output in enumerate(outputs): |
|
if i==0: |
|
|
|
state.append(((imagepath,), output)) |
|
else: |
|
state.append((None, output)) |
|
|
|
|
|
print (state) |
|
properties = {'custom_dimensions': {'session': log_state}} |
|
get_logger().warning("url_input_for_chain: " + url_input_for_chain, extra=properties) |
|
if outputs is None: |
|
outputs = "" |
|
get_logger().warning(str(json.dumps(outputs)), extra=properties) |
|
return state, state |
|
|
|
|
|
|
|
def add_image(state, chain, image, log_state): |
|
global ARGS |
|
state = state or [] |
|
|
|
|
|
imagepath = image.name.replace(" ", "%20") |
|
|
|
url_input_for_chain = "http://0.0.0.0:{}/file={}".format(ARGS.port, imagepath) |
|
|
|
outputs = run_chain(chain, url_input_for_chain) |
|
|
|
|
|
|
|
outputs = process_chain_output(outputs) |
|
|
|
for i, output in enumerate(outputs): |
|
if i==0: |
|
state.append(((imagepath,), output)) |
|
else: |
|
state.append((None, output)) |
|
|
|
|
|
print (state) |
|
properties = {'custom_dimensions': {'session': log_state}} |
|
get_logger().warning("url_input_for_chain: " + url_input_for_chain, extra=properties) |
|
if outputs is None: |
|
outputs = "" |
|
get_logger().warning(str(json.dumps(outputs)), extra=properties) |
|
return state, state |
|
|
|
|
|
def replace_with_image_markup(text): |
|
img_url = None |
|
text= text.strip() |
|
url_idx = text.rfind(" ") |
|
img_url = text[url_idx + 1:].strip() |
|
if img_url.endswith((".", "?")): |
|
img_url = img_url[:-1] |
|
|
|
|
|
|
|
return img_url |
|
|
|
|
|
def process_chain_output(outputs): |
|
global ARGS |
|
EMPTY_AI_REPLY = "AI:" |
|
|
|
if isinstance(outputs, str): |
|
if outputs.strip() == EMPTY_AI_REPLY: |
|
outputs = REFRESH_MSG |
|
outputs = [outputs] |
|
elif isinstance(outputs, list): |
|
if ARGS.noIntermediateConv: |
|
cleanOutputs = [] |
|
for output in outputs: |
|
if output.strip() == EMPTY_AI_REPLY: |
|
output = REFRESH_MSG |
|
|
|
img_url = None |
|
|
|
if "assistant: here is the edited image " in output.lower(): |
|
img_url = replace_with_image_markup(output) |
|
cleanOutputs.append("Assistant: Here is the edited image") |
|
if img_url is not None: |
|
cleanOutputs.append((img_url,)) |
|
else: |
|
cleanOutputs.append(output) |
|
|
|
outputs = cleanOutputs |
|
|
|
return outputs |
|
|
|
|
|
def init_and_kick_off(): |
|
global ARGS |
|
|
|
chat = ChatWrapper() |
|
|
|
exampleTitle = """<h3>Examples to start conversation..</h3>""" |
|
comingSoon = """<center><b><p style="color:Red;">MM-REACT: March 29th version with image understanding capabilities</p></b></center>""" |
|
detailLinks = """ |
|
<center> |
|
<a href="https://multimodal-react.github.io/"> MM-ReAct Website</a> |
|
· |
|
<a href="https://arxiv.org/abs/2303.11381">MM-ReAct Paper</a> |
|
· |
|
<a href="https://github.com/microsoft/MM-REACT">MM-ReAct Code</a> |
|
</center> |
|
""" |
|
|
|
with gr.Blocks(css="#tryButton {width: 120px;}") as block: |
|
llm_state = gr.State() |
|
history_state = gr.State() |
|
chain_state = gr.State() |
|
log_state = gr.State() |
|
|
|
reset_btn = gr.Button(value="!!!CLICK to wake up MM-REACT!!!", variant="primary", elem_id="resetbtn").style(full_width=True) |
|
gr.HTML(detailLinks) |
|
gr.HTML(comingSoon) |
|
|
|
example_image_size = 90 |
|
col_min_width = 80 |
|
button_variant = "primary" |
|
with gr.Row(): |
|
with gr.Column(scale=1.0, min_width=100): |
|
chatbot = gr.Chatbot(elem_id="chatbot", label="MM-REACT Bot").style(height=620) |
|
with gr.Column(scale=0.20, min_width=200, visible=False) as exampleCol: |
|
with gr.Row(): |
|
grExampleTitle = gr.HTML(exampleTitle, visible=False) |
|
with gr.Row(): |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example3Image = gr.Image("images/receipt.png", interactive=False).style(height=example_image_size, width=example_image_size) |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example3ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) |
|
|
|
example3ImagePath = gr.Text("images/receipt.png", interactive=False, visible=False) |
|
with gr.Row(): |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example1Image = gr.Image("images/money.png", interactive=False).style(height=example_image_size, width=example_image_size) |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example1ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) |
|
|
|
example1ImagePath = gr.Text("images/money.png", interactive=False, visible=False) |
|
with gr.Row(): |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example2Image = gr.Image("images/cartoon.png", interactive=False).style(height=example_image_size, width=example_image_size) |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example2ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) |
|
|
|
example2ImagePath = gr.Text("images/cartoon.png", interactive=False, visible=False) |
|
with gr.Row(): |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example4Image = gr.Image("images/product.png", interactive=False).style(height=example_image_size, width=example_image_size) |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example4ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) |
|
|
|
example4ImagePath = gr.Text("images/product.png", interactive=False, visible=False) |
|
with gr.Row(): |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example5Image = gr.Image("images/celebrity.png", interactive=False).style(height=example_image_size, width=example_image_size) |
|
with gr.Column(scale=0.50, min_width=col_min_width): |
|
example5ImageButton = gr.Button(elem_id="tryButton", value="Try it!", variant=button_variant).style(full_width=True) |
|
|
|
example5ImagePath = gr.Text("images/celebrity.png", interactive=False, visible=False) |
|
|
|
|
|
|
|
with gr.Row(): |
|
with gr.Column(scale=0.75): |
|
message = gr.Textbox(label="Upload a pic and ask!", |
|
placeholder="Type your question about the uploaded image", |
|
lines=1, visible=False) |
|
with gr.Column(scale=0.15): |
|
submit = gr.Button(value="Send", variant="secondary", visible=False).style(full_width=True) |
|
with gr.Column(scale=0.10, min_width=0): |
|
btn = gr.UploadButton("🖼️", file_types=["image"], visible=False).style(full_width=True) |
|
|
|
|
|
message.submit(chat, inputs=[message, history_state, chain_state, log_state], outputs=[chatbot, history_state, message]) |
|
|
|
submit.click(chat, inputs=[message, history_state, chain_state, log_state], outputs=[chatbot, history_state, message]) |
|
|
|
btn.upload(add_image, inputs=[history_state, chain_state, btn, log_state], outputs=[history_state, chatbot]) |
|
|
|
|
|
reset_btn.click(load_chain, inputs=[history_state, log_state], outputs=[chatbot, history_state, chain_state, log_state, message, submit, btn, exampleCol, grExampleTitle, reset_btn]) |
|
|
|
|
|
example1ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example1ImagePath, log_state], outputs=[history_state, chatbot]) |
|
example2ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example2ImagePath, log_state], outputs=[history_state, chatbot]) |
|
example3ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example3ImagePath, log_state], outputs=[history_state, chatbot]) |
|
example4ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example4ImagePath, log_state], outputs=[history_state, chatbot]) |
|
example5ImageButton.click(add_image_with_path, inputs=[history_state, chain_state, example5ImagePath, log_state], outputs=[history_state, chatbot]) |
|
|
|
|
|
|
|
block.launch(server_name="0.0.0.0", server_port = ARGS.port) |
|
|
|
if __name__ == '__main__': |
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument('--port', type=int, required=False, default=7860) |
|
parser.add_argument('--openAIModel', type=str, required=False, default='azureGPT35turbo') |
|
parser.add_argument('--noIntermediateConv', default=True, action='store_true', help='if this flag is turned on no intermediate conversation should be shown') |
|
|
|
global ARGS |
|
ARGS = parser.parse_args() |
|
|
|
init_and_kick_off() |