""" Copyright (c) Meta Platforms, Inc. and affiliates. This source code is licensed under the MIT license found in the LICENSE file in the root directory of this source tree. """ import gradio as gr import subprocess import uuid import os #os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" #os.environ["CUDA_VISIBLE_DEVICES"] = "1" # just use one GPU on big machine import torch print ('Available devices ', torch.cuda.device_count()) print ('Current cuda device ', torch.cuda.current_device()) assert torch.cuda.device_count() == 1 print('GPU Device name:', torch.cuda.get_device_name(torch.cuda.current_device())) import requests import re os.environ['http_proxy'] = "" os.environ['https_proxy'] = "" import sys import logging import argparse from functools import partial import torch from torch.utils.data import ConcatDataset from tqdm import tqdm from nougat import NougatModel from nougat.utils.dataset import LazyDataset from nougat.utils.checkpoint import get_checkpoint from nougat.postprocessing import markdown_compatible import fitz from PIL import Image, ImageOps import uuid from pathlib import Path logging.basicConfig(level=logging.INFO) if torch.cuda.is_available(): BATCH_SIZE = int( torch.cuda.get_device_properties(0).total_memory / 1024 / 1024 / 1000 * 0.3 ) if BATCH_SIZE == 0: logging.warning("GPU VRAM is too small. Computing on CPU.") else: # don't know what a good value is here. Would not recommend to run on CPU BATCH_SIZE = 1 logging.warning("No GPU found. Conversion on CPU is very slow.") # Download a sample pdf file - https://arxiv.org/pdf/2308.13418.pdf (nougat paper) # create a new input directory for pdf downloads if not os.path.exists("input"): os.mkdir("input") import requests from urllib.parse import urlparse, unquote def get_pdf(pdf_link): # Generate a unique filename unique_filename = f"input/downloaded_paper_{uuid.uuid4().hex}.pdf" # Send a GET request to the PDF link response = requests.get(pdf_link) if response.status_code == 200: # Save the PDF content to a local file with open(unique_filename, 'wb') as pdf_file: pdf_file.write(response.content) print("PDF downloaded successfully.") else: print("Failed to download the PDF.") return unique_filename #.split('/')[-1][:-4] def get_image(url_list): query_parameters = {"downloadformat": "image"} for url in url_list: url_parsed = urlparse(url) file_path = Path(url_parsed.path) new_path = Path("./input") new_file = os.path.join(new_path, os.path.basename(file_path)) response = requests.get(url, stream=True) if response.ok: with open(new_file, mode="wb") as file: for data in tqdm(response.iter_content()): file.write(data) url_list = ["https://cdn.mathpix.com/snip/images/Hm62Ib-dDZOseYuVNN8k34IhBY18KglOrM7qETOqXZI.original.fullsize.png", "https://cdn.mathpix.com/snip/images/lSL07DYTL1bdjzL2mpNyVg17JmqKwgugMLyGuxkLgLg.original.fullsize.png"] get_pdf("https://arxiv.org/pdf/2308.13418.pdf") get_image(url_list) def nougat_predict(input_files, output_path, checkpoint, batchsize, markdown,recompute): print(f'*** nougat predict with input :{input_files} ***') model = NougatModel.from_pretrained(checkpoint).to(torch.float16) if batchsize > 0: if torch.cuda.is_available(): model.to("cuda") else: # set batch size to 1. Need to check if there are benefits for CPU conversion for >1 batchsize = 1 model.eval() datasets = [] for pdf in input_files: #if not pdf.exists(): if not os.path.exists(pdf): continue if output_path: out_path = output_path / pdf.with_suffix(".mmd").name if out_path.exists() and not recompute: logging.info( f"Skipping {pdf.name}, already computed. Run with --recompute to convert again." ) continue try: dataset = LazyDataset( pdf, partial(model.encoder.prepare_input, random_padding=False) ) except fitz.fitz.FileDataError: logging.info(f"Could not load file {str(pdf)}.") continue datasets.append(dataset) if len(datasets) == 0: print(f'*** nougat out files :{out_path} ***') return out_path dataloader = torch.utils.data.DataLoader( ConcatDataset(datasets), batch_size=batchsize, shuffle=False, collate_fn=LazyDataset.ignore_none_collate, ) predictions = [] file_index = 0 page_num = 0 for i, (sample, is_last_page) in enumerate(tqdm(dataloader)): model_output = model.inference(image_tensors=sample) # check if model output is faulty for j, output in enumerate(model_output["predictions"]): if page_num == 0: logging.info( "Processing file %s with %i pages" % (datasets[file_index].name, datasets[file_index].size) ) page_num += 1 if output.strip() == "[MISSING_PAGE_POST]": # uncaught repetitions -- most likely empty page predictions.append(f"\n\n[MISSING_PAGE_EMPTY:{page_num}]\n\n") elif model_output["repeats"][j] is not None: if model_output["repeats"][j] > 0: # If we end up here, it means the output is most likely not complete and was truncated. logging.warning(f"Skipping page {page_num} due to repetitions.") predictions.append(f"\n\n[MISSING_PAGE_FAIL:{page_num}]\n\n") else: # If we end up here, it means the document page is too different from the training domain. # This can happen e.g. for cover pages. predictions.append( f"\n\n[MISSING_PAGE_EMPTY:{i*batchsize+j+1}]\n\n" ) else: if markdown: output = markdown_compatible(output) predictions.append(output) if is_last_page[j]: out = "".join(predictions).strip() out = re.sub(r"\n{3,}", "\n\n", out).strip() if output_path: out_path = output_path / Path(is_last_page[j]).with_suffix(".mmd").name out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(out, encoding="utf-8") else: print(out, "\n\n") predictions = [] page_num = 0 file_index += 1 print(f'the generated markdown file is : {out_path}') return out_path def nougat_ocr(file_name): #unique_filename = f"/content/output/downloaded_paper_{uuid.uuid4().hex}.pdf" # Command to run cli_command = [ 'nougat', #'--out', unique_filename, '--out', 'output', 'pdf', f'{file_name}', '--checkpoint', 'nougat', '--markdown' ] # Run the command and capture its output #completed_process = subprocess.run(cli_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return #unique_filename # Download pdf from a given link def get_pdf(pdf_link): # Generate a unique filename unique_filename = f"input/downloaded_paper_{uuid.uuid4().hex}.pdf" # Send a GET request to the PDF link response = requests.get(pdf_link) if response.status_code == 200: # Save the PDF content to a local file with open(unique_filename, 'wb') as pdf_file: pdf_file.write(response.content) print("PDF downloaded successfully.") else: print("Failed to download the PDF.") return unique_filename def resize_with_padding(img, expected_size): img.thumbnail((expected_size[0], expected_size[1])) # print(img.size) delta_width = expected_size[0] - img.size[0] delta_height = expected_size[1] - img.size[1] pad_width = delta_width // 2 pad_height = delta_height // 2 padding = (pad_width, pad_height, delta_width - pad_width, delta_height - pad_height) return ImageOps.expand(img, padding, fill = 'white') def predict_image(checkpoint, images, batchsize=1, markdown=True, out_path=""): model = NougatModel.from_pretrained(checkpoint).to(torch.float16) prepare = model.encoder.prepare_input if batchsize > 0: if torch.cuda.is_available(): model.to("cuda") else: # set batch size to 1. Need to check if there are benefits for CPU conversion for >1 batchsize = 1 model.eval() datasets = [] output = "" predictions = [] if images!= None: import PIL.ImageOps from PIL import Image import torchvision.transforms as transforms print("we are under image to mmd convertiong") sample = Image.open(images.name).convert('RGB') im_new = resize_with_padding(sample, (672,896)) img_tensor = prepare(im_new,random_padding=False) img_tensor = img_tensor.unsqueeze(0) model_output = model.inference(image_tensors=img_tensor) for j, output in enumerate(model_output["predictions"]): predictions.append(output) # check if model output is faulty if markdown: output = markdown_compatible(output) out = "".join(predictions).strip() out = re.sub(r"\n{3,}", "\n\n", out).strip() if out: out_path = Path(out_path) / Path(images.name).with_suffix(".mmd").name out_path.parent.mkdir(parents=True, exist_ok=True) if out_path.exists(): os.remove(out_path) with open(out_path,mode="w",encoding="utf-8") as f: out = out.replace(r"\(", "\$").replace(r'\)', '\$').replace(r'\[', '\$\$').replace(r'\]', '\$\$') f.write(out) else: print(out, "\n\n") return out_path # predict function / driver function def paper_read(pdf_file, pdf_link, img_file): output_path = Path("./output") checkpoint = Path("/workspace/nougat-latex/nougat_big_facebook/") config = Path("/workspace/nougat-latex/nougat_big_facebook/config.json") markdown = True batchsize = BATCH_SIZE if img_file is not None: file_name = img_file.name output_files = predict_image(checkpoint, img_file, markdown=True, out_path=Path("output")) else: if pdf_file is None: if pdf_link == '': print("No file is uploaded and No link is provided") return "No data provided. Upload a pdf file or provide a pdf link and try again!" else: file_name = get_pdf(pdf_link) else: file_name = pdf_file.name pdf_name = pdf_file.name.split('/')[-1].split('.')[0] print(pdf_name) input_files = file_name if isinstance(file_name, os.PathLike) else Path(file_name), output_files = nougat_predict(input_files=input_files, output_path=output_path, checkpoint = checkpoint, batchsize = batchsize, markdown = markdown, recompute=False) print(f'the generated markdown file is : {output_files}') # Open the file for reading file_name = file_name.split('/')[-1][:-4] #with open(f'output/{file_name}.mmd', 'r') as file: with open(output_files, 'r+') as file: content = file.read() # switch math delimiters content = content.replace(r"\(", "\$").replace(r'\)', '\$').replace(r'\[', '\$\$').replace(r'\]', '\$\$') print("***********************************") print("convert successfully") print("***********************************") return content # Handling examples in Gradio app def process_example(pdf_file,pdf_link,img_file): ocr_content = paper_read(pdf_file,pdf_link,img_file) return gr.update(value=ocr_content) css = """ #mkd { height: 500px; overflow: auto; border: 1px solid #ccc; } """ with gr.Blocks(css=css) as demo: gr.HTML("

Nougat: Neural Optical Understanding for Academic Documents

") gr.HTML("

Lukas Blecher et al. Paper, Project

") with gr.Row(): mkd = gr.Markdown('

Upload a PDF

') mkd = gr.Markdown('

OR

') mkd = gr.Markdown('

Provide a PDF link

') with gr.Row(equal_height=True): pdf_file = gr.File(label='PDF📃', file_count='single') pdf_link = gr.Textbox(placeholder='Enter an Arxiv link here', label='PDF link🔗🌐') img_file = gr.File(label='IMG📃', file_count='single') with gr.Row(): btn = gr.Button('Run NOUGAT🍫') clr = gr.Button('Clear🚿') output_headline = gr.Markdown("

PDF converted into markup language through Nougat-OCR👇:

") parsed_output = gr.Markdown(r'OCR Output📃🔤',elem_id='mkd', latex_delimiters=[{ "left": r"\(", "right": r"\)", "display": False },{ "left": r"\[", "right": r"\]", "display": True }]) btn.click(paper_read, [pdf_file, pdf_link, img_file], parsed_output ) clr.click(lambda : (gr.update(value=None), gr.update(value=None), gr.update(value=None)), [], [pdf_file, pdf_link, img_file, parsed_output] ) gr.Examples( [["./input/test.pdf", "", None], [None, "https://arxiv.org/pdf/2308.08316.pdf", None], [None, "", "./input/Hm62Ib-dDZOseYuVNN8k34IhBY18KglOrM7qETOqXZI.original.fullsize.png"]], inputs = [pdf_file, pdf_link, img_file], outputs = parsed_output, fn=process_example, cache_examples=False, label='Click on any Examples below to get Nougat OCR results quickly:' ) demo.queue() demo.launch(debug=True,share=True, server_name="0.0.0.0",server_port=8855)