vincentclaes's picture
add documentation
f2244fa
import io
import os
import boto3
import traceback
import re
import logging
import gradio as gr
from PIL import Image, ImageDraw
from docquery.document import load_document, ImageDocument
from docquery.ocr_reader import get_ocr_reader
from transformers import AutoTokenizer, AutoModelForQuestionAnswering
from transformers import DonutProcessor, VisionEncoderDecoderModel
from transformers import pipeline
# avoid ssl errors
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
os.environ["TOKENIZERS_PARALLELISM"] = "false"
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Init models
layoutlm_pipeline = pipeline(
"document-question-answering",
model="impira/layoutlm-document-qa",
)
lilt_tokenizer = AutoTokenizer.from_pretrained("SCUT-DLVCLab/lilt-infoxlm-base")
lilt_model = AutoModelForQuestionAnswering.from_pretrained(
"nielsr/lilt-xlm-roberta-base"
)
donut_processor = DonutProcessor.from_pretrained(
"naver-clova-ix/donut-base-finetuned-docvqa"
)
donut_model = VisionEncoderDecoderModel.from_pretrained(
"naver-clova-ix/donut-base-finetuned-docvqa"
)
TEXTRACT = "Textract Query"
LAYOUTLM = "LayoutLM"
DONUT = "Donut"
LILT = "LiLT"
def image_to_byte_array(image: Image) -> bytes:
image_as_byte_array = io.BytesIO()
image.save(image_as_byte_array, format="PNG")
image_as_byte_array = image_as_byte_array.getvalue()
return image_as_byte_array
def run_textract(question, document):
logger.info(f"Running Textract model.")
image_as_byte_base64 = image_to_byte_array(image=document.b)
response = boto3.client("textract").analyze_document(
Document={
"Bytes": image_as_byte_base64,
},
FeatureTypes=[
"QUERIES",
],
QueriesConfig={
"Queries": [
{
"Text": question,
"Pages": [
"*",
],
},
]
},
)
logger.info(f"Output of Textract model {response}.")
for element in response["Blocks"]:
if element["BlockType"] == "QUERY_RESULT":
return {
"score": element["Confidence"],
"answer": element["Text"],
# "word_ids": element
}
else:
Exception("No QUERY_RESULT found in the response from Textract.")
def run_layoutlm(question, document):
logger.info(f"Running layoutlm model.")
result = layoutlm_pipeline(document.context["image"][0][0], question)[0]
logger.info(f"Output of layoutlm model {result}.")
# [{'score': 0.9999411106109619, 'answer': 'LETTER OF CREDIT', 'start': 106, 'end': 108}]
return {
"score": result["score"],
"answer": result["answer"],
"word_ids": [result["start"], result["end"]],
"page": 0,
}
def run_lilt(question, document):
logger.info(f"Running lilt model.")
# use this model + tokenizer
processed_document = document.context["image"][0][1]
words = [x[0] for x in processed_document]
boxes = [x[1] for x in processed_document]
encoding = lilt_tokenizer(
text=question,
text_pair=words,
boxes=boxes,
add_special_tokens=True,
return_tensors="pt",
)
outputs = lilt_model(**encoding)
logger.info(f"Output for lilt model {outputs}.")
answer_start_index = outputs.start_logits.argmax()
answer_end_index = outputs.end_logits.argmax()
predict_answer_tokens = encoding.input_ids[
0, answer_start_index: answer_end_index + 1
]
predict_answer = lilt_tokenizer.decode(
predict_answer_tokens, skip_special_tokens=True
)
return {
"score": "n/a",
"answer": predict_answer,
# "word_ids": element
}
def run_donut(question, document):
logger.info(f"Running donut model.")
# prepare encoder inputs
pixel_values = donut_processor(
document.context["image"][0][0], return_tensors="pt"
).pixel_values
# prepare decoder inputs
task_prompt = "<s_docvqa><s_question>{user_input}</s_question><s_answer>"
prompt = task_prompt.replace("{user_input}", question)
decoder_input_ids = donut_processor.tokenizer(
prompt, add_special_tokens=False, return_tensors="pt"
).input_ids
# generate answer
outputs = donut_model.generate(
pixel_values,
decoder_input_ids=decoder_input_ids,
max_length=donut_model.decoder.config.max_position_embeddings,
early_stopping=True,
pad_token_id=donut_processor.tokenizer.pad_token_id,
eos_token_id=donut_processor.tokenizer.eos_token_id,
use_cache=True,
num_beams=1,
bad_words_ids=[[donut_processor.tokenizer.unk_token_id]],
return_dict_in_generate=True,
)
logger.info(f"Output for donut {outputs}")
sequence = donut_processor.batch_decode(outputs.sequences)[0]
sequence = sequence.replace(donut_processor.tokenizer.eos_token, "").replace(
donut_processor.tokenizer.pad_token, ""
)
sequence = re.sub(
r"<.*?>", "", sequence, count=1
).strip() # remove first task start token
result = donut_processor.token2json(sequence)
return {
"score": "n/a",
"answer": result["answer"],
# "word_ids": element
}
def process_path(path):
error = None
if path:
try:
document = load_document(path)
return (
document,
gr.update(visible=True, value=document.preview),
gr.update(visible=True),
gr.update(visible=False, value=None),
gr.update(visible=False, value=None),
None,
)
except Exception as e:
traceback.print_exc()
error = str(e)
return (
None,
gr.update(visible=False, value=None),
gr.update(visible=False),
gr.update(visible=False, value=None),
gr.update(visible=False, value=None),
gr.update(visible=True, value=error) if error is not None else None,
None,
)
def process_upload(file):
if file:
return process_path(file.name)
else:
return (
None,
gr.update(visible=False, value=None),
gr.update(visible=False),
gr.update(visible=False, value=None),
gr.update(visible=False, value=None),
None,
)
def lift_word_boxes(document, page):
return document.context["image"][page][1]
def expand_bbox(word_boxes):
if len(word_boxes) == 0:
return None
min_x, min_y, max_x, max_y = zip(*[x[1] for x in word_boxes])
min_x, min_y, max_x, max_y = [min(min_x), min(min_y), max(max_x), max(max_y)]
return [min_x, min_y, max_x, max_y]
# LayoutLM boxes are normalized to 0, 1000
def normalize_bbox(box, width, height, padding=0.005):
min_x, min_y, max_x, max_y = [c / 1000 for c in box]
if padding != 0:
min_x = max(0, min_x - padding)
min_y = max(0, min_y - padding)
max_x = min(max_x + padding, 1)
max_y = min(max_y + padding, 1)
return [min_x * width, min_y * height, max_x * width, max_y * height]
MODELS = {
LAYOUTLM: run_layoutlm,
DONUT: run_donut,
# LILT: run_lilt,
TEXTRACT: run_textract,
}
def process_question(question, document, model=list(MODELS.keys())[0]):
if not question or document is None:
return None, None, None
logger.info(f"Running for model {model}")
prediction = MODELS[model](question=question, document=document)
logger.info(f"Got prediction {prediction}")
pages = [x.copy().convert("RGB") for x in document.preview]
text_value = prediction["answer"]
if "word_ids" in prediction:
logger.info(f"Setting bounding boxes.")
image = pages[prediction["page"]]
draw = ImageDraw.Draw(image, "RGBA")
word_boxes = lift_word_boxes(document, prediction["page"])
x1, y1, x2, y2 = normalize_bbox(
expand_bbox([word_boxes[i] for i in prediction["word_ids"]]),
image.width,
image.height,
)
draw.rectangle(((x1, y1), (x2, y2)), fill=(0, 255, 0, int(0.4 * 255)))
return (
gr.update(visible=True, value=pages),
gr.update(visible=True, value=prediction),
gr.update(
visible=True,
value=text_value,
),
)
def load_example_document(img, question, model):
if img is not None:
document = ImageDocument(Image.fromarray(img), get_ocr_reader())
preview, answer, answer_text = process_question(question, document, model)
return document, question, preview, gr.update(visible=True), answer, answer_text
else:
return None, None, None, gr.update(visible=False), None, None
CSS = """
#question input {
font-size: 16px;
}
#url-textbox {
padding: 0 !important;
}
#short-upload-box .w-full {
min-height: 10rem !important;
}
/* I think something like this can be used to re-shape
* the table
*/
/*
.gr-samples-table tr {
display: inline;
}
.gr-samples-table .p-2 {
width: 100px;
}
*/
#select-a-file {
width: 100%;
}
#file-clear {
padding-top: 2px !important;
padding-bottom: 2px !important;
padding-left: 8px !important;
padding-right: 8px !important;
margin-top: 10px;
}
.gradio-container .gr-button-primary {
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%);
border: 1px solid #B0DCCC;
border-radius: 8px;
color: #1B8700;
}
.gradio-container.dark button#submit-button {
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%);
border: 1px solid #B0DCCC;
border-radius: 8px;
color: #1B8700
}
table.gr-samples-table tr td {
border: none;
outline: none;
}
table.gr-samples-table tr td:first-of-type {
width: 0%;
}
div#short-upload-box div.absolute {
display: none !important;
}
gradio-app > div > div > div > div.w-full > div, .gradio-app > div > div > div > div.w-full > div {
gap: 0px 2%;
}
gradio-app div div div div.w-full, .gradio-app div div div div.w-full {
gap: 0px;
}
gradio-app h2, .gradio-app h2 {
padding-top: 10px;
}
#answer {
overflow-y: scroll;
color: white;
background: #666;
border-color: #666;
font-size: 20px;
font-weight: bold;
}
#answer span {
color: white;
}
#answer textarea {
color:white;
background: #777;
border-color: #777;
font-size: 18px;
}
#url-error input {
color: red;
}
"""
examples = [
[
"scenario-1.png",
"What is the final consignee?",
],
[
"scenario-1.png",
"What are the payment terms?",
],
[
"scenario-2.png",
"What is the actual manufacturer?",
],
[
"scenario-3.png",
'What is the "ship to" destination?',
],
[
"scenario-4.png",
"What is the color?",
],
[
"scenario-5.png",
'What is the "said to contain"?',
],
[
"scenario-5.png",
'What is the "Net Weight"?',
],
[
"scenario-5.png",
'What is the "Freight Collect"?',
],
[
"bill_of_lading_1.png",
"What is the shipper?",
],
[
"japanese-invoice.png",
"What is the total amount?",
]
]
with gr.Blocks(css=CSS) as demo:
gr.Markdown("# Document Question Answer Comparator")
gr.Markdown("""
This space compares some of the latest models that can be used commercially.
- [LayoutLM](https://huggingface.co/impira/layoutlm-document-qa) uses text/layout and images. Uses tesseract for OCR.
- [Donut](https://huggingface.co/naver-clova-ix/donut-base-finetuned-docvqa) OCR free document understanding. Uses vision encoder for OCR and a text decoder for providing the answer.
- [Textract Query](https://docs.aws.amazon.com/textract/latest/dg/what-is.html) OCR + document understanding solution of AWS.
""")
document = gr.Variable()
example_question = gr.Textbox(visible=False)
example_image = gr.Image(visible=False)
with gr.Row(equal_height=True):
with gr.Column():
with gr.Row():
gr.Markdown("## 1. Select a file", elem_id="select-a-file")
img_clear_button = gr.Button(
"Clear", variant="secondary", elem_id="file-clear", visible=False
)
image = gr.Gallery(visible=False)
upload = gr.File(label=None, interactive=True, elem_id="short-upload-box")
gr.Examples(
examples=examples,
inputs=[example_image, example_question],
)
with gr.Column() as col:
gr.Markdown("## 2. Ask a question")
question = gr.Textbox(
label="Question",
placeholder="e.g. What is the invoice number?",
lines=1,
max_lines=1,
)
model = gr.Radio(
choices=list(MODELS.keys()),
value=list(MODELS.keys())[0],
label="Model",
)
with gr.Row():
clear_button = gr.Button("Clear", variant="secondary")
submit_button = gr.Button(
"Submit", variant="primary", elem_id="submit-button"
)
with gr.Column():
output_text = gr.Textbox(
label="Top Answer", visible=False, elem_id="answer"
)
output = gr.JSON(label="Output", visible=False)
for cb in [img_clear_button, clear_button]:
cb.click(
lambda _: (
gr.update(visible=False, value=None),
None,
gr.update(visible=False, value=None),
gr.update(visible=False, value=None),
gr.update(visible=False),
None,
None,
None,
gr.update(visible=False, value=None),
None,
),
inputs=clear_button,
outputs=[
image,
document,
output,
output_text,
img_clear_button,
example_image,
upload,
question,
],
)
upload.change(
fn=process_upload,
inputs=[upload],
outputs=[document, image, img_clear_button, output, output_text],
)
question.submit(
fn=process_question,
inputs=[question, document, model],
outputs=[image, output, output_text],
)
submit_button.click(
process_question,
inputs=[question, document, model],
outputs=[image, output, output_text],
)
model.change(
process_question,
inputs=[question, document, model],
outputs=[image, output, output_text],
)
example_image.change(
fn=load_example_document,
inputs=[example_image, example_question, model],
outputs=[document, question, image, img_clear_button, output, output_text],
)
if __name__ == "__main__":
demo.launch(enable_queue=False)