import os os.environ["TOKENIZERS_PARALLELISM"] = "false" from PIL import Image, ImageDraw import traceback import gradio as gr from gradio import processing_utils import torch from docquery import pipeline from docquery.document import load_bytes, load_document, ImageDocument from docquery.ocr_reader import get_ocr_reader def ensure_list(x): if isinstance(x, list): return x else: return [x] CHECKPOINTS = { "LayoutLMv1 for Invoices ๐Ÿงพ": "impira/layoutlm-invoices", } PIPELINES = {} def construct_pipeline(task, model): global PIPELINES if model in PIPELINES: return PIPELINES[model] device = "cuda" if torch.cuda.is_available() else "cpu" ret = pipeline(task=task, model=CHECKPOINTS[model], device=device) PIPELINES[model] = ret return ret def run_pipeline(model, question, document, top_k): pipeline = construct_pipeline("document-question-answering", model) return pipeline(question=question, **document.context, top_k=top_k) # TODO: Move into docquery # TODO: Support words past the first page (or window?) def lift_word_boxes(document, page): return document.context["image"][page][1] def expand_bbox(word_boxes): if len(word_boxes) == 0: return None min_x, min_y, max_x, max_y = zip(*[x[1] for x in word_boxes]) min_x, min_y, max_x, max_y = [min(min_x), min(min_y), max(max_x), max(max_y)] return [min_x, min_y, max_x, max_y] # LayoutLM boxes are normalized to 0, 1000 def normalize_bbox(box, width, height, padding=0.005): min_x, min_y, max_x, max_y = [c / 1000 for c in box] if padding != 0: min_x = max(0, min_x - padding) min_y = max(0, min_y - padding) max_x = min(max_x + padding, 1) max_y = min(max_y + padding, 1) return [min_x * width, min_y * height, max_x * width, max_y * height] EXAMPLES = [ [ "acze_tech.png", "Tech Invoice", ], [ "acze.png", "Commercial Goods Invoice", ], [ "north_sea.png", "Energy Invoice", ], ] QUESTION_FILES = { "Tech Invoice": "acze_tech.pdf", "Energy Invoice": "north_sea.pdf", } for q in QUESTION_FILES.keys(): assert any(x[1] == q for x in EXAMPLES) FIELDS = { "Vendor Name": ["Vendor Name - Logo?", "Vendor Name - Address?"], "Vendor Address": ["Vendor Address?"], "Customer Name": ["Customer Name?"], "Customer Address": ["Customer Address?"], "Invoice Number": ["Invoice Number?"], "Invoice Date": ["Invoice Date?"], "Due Date": ["Due Date?"], "Subtotal": ["Subtotal?"], "Total Tax": ["Total Tax?"], "Invoice Total": ["Invoice Total?"], "Amount Due": ["Amount Due?"], "Payment Terms": ["Payment Terms?"], "Remit To Name": ["Remit To Name?"], "Remit To Address": ["Remit To Address?"], } def empty_table(fields): return {"value": [[name, None] for name in fields.keys()], "interactive": False} def process_document(document, fields, model, error=None): if document is not None and error is None: preview, json_output, table = process_fields(document, fields, model) return ( document, fields, preview, gr.update(visible=True), gr.update(visible=False, value=None), json_output, table, ) else: return ( None, fields, None, gr.update(visible=False), gr.update(visible=True, value=error) if error is not None else None, None, gr.update(**empty_table(fields)), ) def process_path(path, fields, model): error = None document = None if path: try: document = load_document(path) except Exception as e: traceback.print_exc() error = str(e) return process_document(document, fields, model, error) def process_upload(file, fields, model): return process_path(file.name if file else None, fields, model) colors = ["#64A087", "green", "black"] def annotate_page(prediction, pages, document): if prediction is not None and "word_ids" in prediction: image = pages[prediction["page"]] draw = ImageDraw.Draw(image, "RGBA") word_boxes = lift_word_boxes(document, prediction["page"]) x1, y1, x2, y2 = normalize_bbox( expand_bbox([word_boxes[i] for i in prediction["word_ids"]]), image.width, image.height, ) draw.rectangle(((x1, y1), (x2, y2)), fill=(0, 255, 0, int(0.4 * 255))) def process_question( question, document, img_gallery, model, fields, output, output_table ): field_name = question if field_name is not None: fields = {field_name: [question], **fields} if not question or document is None: return None, document, fields, output, gr.update(value=output_table) text_value = None pages = [processing_utils.decode_base64_to_image(p) for p in img_gallery] prediction = run_pipeline(model, question, document, 1) annotate_page(prediction, pages, document) output = {field_name: prediction, **output} table = [[field_name, prediction.get("answer")]] + output_table.values.tolist() return ( None, gr.update(visible=True, value=pages), fields, output, gr.update(value=table, interactive=False), ) def process_fields(document, fields, model=list(CHECKPOINTS.keys())[0]): pages = [x.copy().convert("RGB") for x in document.preview] ret = {} table = [] for (field_name, questions) in fields.items(): answers = [ a for q in questions for a in ensure_list(run_pipeline(model, q, document, top_k=1)) if a.get("score", 1) > 0.5 ] answers.sort(key=lambda x: -x.get("score", 0) if x else 0) top = answers[0] if len(answers) > 0 else None annotate_page(top, pages, document) ret[field_name] = top table.append([field_name, top.get("answer") if top is not None else None]) return ( gr.update(visible=True, value=pages), gr.update(visible=True, value=ret), table ) def load_example_document(img, title, fields, model): document = None if img is not None: if title in QUESTION_FILES: document = load_document(QUESTION_FILES[title]) else: document = ImageDocument(Image.fromarray(img), ocr_reader=get_ocr_reader()) return process_document(document, fields, model) CSS = """ #question input { font-size: 16px; } #url-textbox, #question-textbox { padding: 0 !important; } #short-upload-box .w-full { min-height: 10rem !important; } /* I think something like this can be used to re-shape * the table */ /* .gr-samples-table tr { display: inline; } .gr-samples-table .p-2 { width: 100px; } */ #select-a-file { width: 100%; } #file-clear { padding-top: 2px !important; padding-bottom: 2px !important; padding-left: 8px !important; padding-right: 8px !important; margin-top: 10px; } .gradio-container .gr-button-primary { background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%); border: 1px solid #B0DCCC; border-radius: 8px; color: #1B8700; } .gradio-container.dark button#submit-button { background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%); border: 1px solid #B0DCCC; border-radius: 8px; color: #1B8700 } table.gr-samples-table tr td { border: none; outline: none; } table.gr-samples-table tr td:first-of-type { width: 0%; } div#short-upload-box div.absolute { display: none !important; } gradio-app > div > div > div > div.w-full > div, .gradio-app > div > div > div > div.w-full > div { gap: 0px 2%; } gradio-app div div div div.w-full, .gradio-app div div div div.w-full { gap: 0px; } gradio-app h2, .gradio-app h2 { padding-top: 10px; } #answer { overflow-y: scroll; color: white; background: #666; border-color: #666; font-size: 20px; font-weight: bold; } #answer span { color: white; } #answer textarea { color:white; background: #777; border-color: #777; font-size: 18px; } #url-error input { color: red; } #results-table { max-height: 600px; overflow-y: scroll; } """ with gr.Blocks(css=CSS) as demo: gr.Markdown("# DocQuery for Invoices") gr.Markdown( "DocQuery (created by [Impira](https://impira.com?utm_source=huggingface&utm_medium=referral&utm_campaign=invoices_space))" " uses LayoutLMv1 fine-tuned on an invoice dataset" " as well as DocVQA and SQuAD, which boot its general comprehension skills. The model is an enhanced" " QA architecture that supports selecting blocks of text which may be non-consecutive, which is a major" " issue when dealing with invoice documents (e.g. addresses)." " To use it, simply upload an image or PDF invoice and the model will predict values for several fields." " You can also create additional fields by simply typing in a question." " DocQuery is available on [Github](https://github.com/impira/docquery)." ) document = gr.Variable() fields = gr.Variable(value={**FIELDS}) example_question = gr.Textbox(visible=False) example_image = gr.Image(visible=False) with gr.Row(equal_height=True): with gr.Column(): with gr.Row(): gr.Markdown("## Select an invoice", elem_id="select-a-file") img_clear_button = gr.Button( "Clear", variant="secondary", elem_id="file-clear", visible=False ) image = gr.Gallery(visible=False) with gr.Row(equal_height=True): with gr.Column(): with gr.Row(): url = gr.Textbox( show_label=False, placeholder="URL", lines=1, max_lines=1, elem_id="url-textbox", ) submit = gr.Button("Get") url_error = gr.Textbox( visible=False, elem_id="url-error", max_lines=1, interactive=False, label="Error", ) gr.Markdown("โ€” or โ€”") upload = gr.File(label=None, interactive=True, elem_id="short-upload-box") gr.Examples( examples=EXAMPLES, inputs=[example_image, example_question], ) with gr.Column() as col: gr.Markdown("## Results") with gr.Tabs(): with gr.TabItem("Table"): output_table = gr.Dataframe( headers=["Field", "Value"], **empty_table(fields.value), elem_id="results-table" ) with gr.TabItem("JSON"): output = gr.JSON(label="Output", visible=True) model = gr.Radio( choices=list(CHECKPOINTS.keys()), value=list(CHECKPOINTS.keys())[0], label="Model", visible=False, ) gr.Markdown("### Ask a question") with gr.Row(): question = gr.Textbox( label="Question", show_label=False, placeholder="e.g. What is the invoice number?", lines=1, max_lines=1, elem_id="question-textbox", ) clear_button = gr.Button("Clear", variant="secondary", visible=False) submit_button = gr.Button( "Add", variant="primary", elem_id="submit-button" ) for cb in [img_clear_button, clear_button]: cb.click( lambda _: ( gr.update(visible=False, value=None), # image None, # document # {**FIELDS}, # fields gr.update(value=None), # output gr.update(**empty_table(fields.value)), # output_table gr.update(visible=False), None, None, None, gr.update(visible=False, value=None), None, ), inputs=clear_button, outputs=[ image, document, # fields, output, output_table, img_clear_button, example_image, upload, url, url_error, question, ], ) submit_outputs = [ document, fields, image, img_clear_button, url_error, output, output_table, ] upload.change( fn=process_upload, inputs=[upload, fields, model], outputs=submit_outputs, ) submit.click( fn=process_path, inputs=[url, fields, model], outputs=submit_outputs, ) for action in [question.submit, submit_button.click]: action( fn=process_question, inputs=[question, document, image, model, fields, output, output_table], outputs=[question, image, fields, output, output_table], ) # model.change( # process_question, # inputs=[question, document, model], # outputs=[image, output, output_table], # ) example_image.change( fn=load_example_document, inputs=[example_image, example_question, fields, model], outputs=submit_outputs, ) if __name__ == "__main__": demo.launch(enable_queue=False)