import os import re import sys import glob import shlex from functools import partial from multiprocessing import Pool from subprocess import check_call, CalledProcessError, TimeoutExpired, PIPE from arxiv_public_data.config import LOGGER from arxiv_public_data import fixunicode, pdfstamp log = LOGGER.getChild('fulltext') TIMELIMIT = 2*60 STAMP_SEARCH_LIMIT = 1000 PDF2TXT = 'pdf2txt.py' PDFTOTEXT = 'pdftotext' RE_REPEATS = r'(\(cid:\d+\)|lllll|\.\.\.\.\.|\*\*\*\*\*)' def reextension(filename: str, extension: str) -> str: """ Give a filename a new extension """ name, _ = os.path.splitext(filename) return '{}.{}'.format(name, extension) def average_word_length(txt): """ Gather statistics about the text, primarily the average word length Parameters ---------- txt : str Returns ------- word_length : float Average word length in the text """ #txt = re.subn(RE_REPEATS, '', txt)[0] nw = len(txt.split()) nc = len(txt) avgw = nc / (nw + 1) return avgw def process_timeout(cmd, timeout): return check_call(cmd, timeout=timeout, stdout=PIPE, stderr=PIPE) # ============================================================================ # functions for calling the text extraction services # ============================================================================ def run_pdf2txt(pdffile: str, timelimit: int=TIMELIMIT, options: str=''): """ Run pdf2txt to extract full text Parameters ---------- pdffile : str Path to PDF file timelimit : int Amount of time to wait for the process to complete Returns ------- output : str Full plain text output """ log.debug('Running {} on {}'.format(PDF2TXT, pdffile)) tmpfile = reextension(pdffile, 'pdf2txt') cmd = '{cmd} {options} -o "{output}" "{pdf}"'.format( cmd=PDF2TXT, options=options, output=tmpfile, pdf=pdffile ) cmd = shlex.split(cmd) output = process_timeout(cmd, timeout=timelimit) with open(tmpfile) as f: return f.read() def run_pdftotext(pdffile: str, timelimit: int = TIMELIMIT) -> str: """ Run pdftotext on PDF file for extracted plain text Parameters ---------- pdffile : str Path to PDF file timelimit : int Amount of time to wait for the process to complete Returns ------- output : str Full plain text output """ log.debug('Running {} on {}'.format(PDFTOTEXT, pdffile)) tmpfile = reextension(pdffile, 'pdftotxt') cmd = '{cmd} "{pdf}" "{output}"'.format( cmd=PDFTOTEXT, pdf=pdffile, output=tmpfile ) cmd = shlex.split(cmd) output = process_timeout(cmd, timeout=timelimit) with open(tmpfile) as f: return f.read() def run_pdf2txt_A(pdffile: str, **kwargs) -> str: """ Run pdf2txt with the -A option which runs 'positional analysis on images' and can return better results when pdf2txt combines many words together. Parameters ---------- pdffile : str Path to PDF file kwargs : dict Keyword arguments to :func:`run_pdf2txt` Returns ------- output : str Full plain text output """ return run_pdf2txt(pdffile, options='-A', **kwargs) # ============================================================================ # main function which extracts text # ============================================================================ def fulltext(pdffile: str, timelimit: int = TIMELIMIT): """ Given a pdf file, extract the unicode text and run through very basic unicode normalization routines. Determine the best extracted text and return as a string. Parameters ---------- pdffile : str Path to PDF file from which to extract text timelimit : int Time in seconds to allow the extraction routines to run Returns ------- fulltext : str The full plain text of the PDF """ if not os.path.isfile(pdffile): raise FileNotFoundError(pdffile) if os.stat(pdffile).st_size == 0: # file is empty raise RuntimeError('"{}" is an empty file'.format(pdffile)) try: output = run_pdftotext(pdffile, timelimit=timelimit) #output = run_pdf2txt(pdffile, timelimit=timelimit) except (TimeoutExpired, CalledProcessError, RuntimeError) as e: output = run_pdf2txt(pdffile, timelimit=timelimit) #output = run_pdftotext(pdffile, timelimit=timelimit) output = fixunicode.fix_unicode(output) #output = stamp.remove_stamp(output, split=STAMP_SEARCH_LIMIT) wordlength = average_word_length(output) if wordlength <= 45: try: os.remove(reextension(pdffile, 'pdftotxt')) # remove the tempfile except OSError: pass return output output = run_pdf2txt_A(pdffile, timelimit=timelimit) output = fixunicode.fix_unicode(output) #output = stamp.remove_stamp(output, split=STAMP_SEARCH_LIMIT) wordlength = average_word_length(output) if wordlength > 45: raise RuntimeError( 'No accurate text could be extracted from "{}"'.format(pdffile) ) try: os.remove(reextension(pdffile, 'pdftotxt')) # remove the tempfile except OSError: pass return output def sorted_files(globber: str): """ Give a globbing expression of files to find. They will be sorted upon return. This function is most useful when sorting does not provide numerical order, e.g.: 9 -> 12 returned as 10 11 12 9 by string sort In this case use num_sort=True, and it will be sorted by numbers in the string, then by the string itself. Parameters ---------- globber : str Expression on which to search for files (bash glob expression) """ files = glob.glob(globber, recursive = True) # return a list of path, including sub directories files.sort() allfiles = [] for fn in files: nums = re.findall(r'\d+', fn) # regular expression, find number in path names data = [str(int(n)) for n in nums] + [fn] # a list of [first number, second number,..., filename] in string format otherwise sorted fill fail allfiles.append(data) # list of list allfiles = sorted(allfiles) return [f[-1] for f in allfiles] # sorted filenames def convert_directory(path: str, timelimit: int = TIMELIMIT): """ Convert all pdfs in a given `path` to full plain text. For each pdf, a file of the same name but extension .txt will be created. If that file exists, it will be skipped. Parameters ---------- path : str Directory in which to search for pdfs and convert to text Returns ------- output : list of str List of converted files """ outlist = [] globber = os.path.join(path, '*.pdf') pdffiles = sorted_files(globber) log.info('Searching "{}"...'.format(globber)) log.info('Found: {} pdfs'.format(len(pdffiles))) for pdffile in pdffiles: txtfile = reextension(pdffile, 'txt') if os.path.exists(txtfile): continue # we don't want this function to stop half way because of one failed # file so just charge onto the next one try: text = fulltext(pdffile, timelimit) with open(txtfile, 'w') as f: f.write(text) except Exception as e: log.error("Conversion failed for '{}'".format(pdffile)) log.exception(e) continue outlist.append(pdffile) return outlist def convert_directory_parallel(path: str, processes: int, timelimit: int = TIMELIMIT): """ Convert all pdfs in a given `path` to full plain text. For each pdf, a file of the same name but extension .txt will be created. If that file exists, it will be skipped. Parameters ---------- path : str Directory in which to search for pdfs and convert to text Returns ------- output : list of str List of converted files """ globber = os.path.join(path, '**/*.pdf') # search expression for glob.glob pdffiles = sorted_files(globber) # a list of path log.info('Searching "{}"...'.format(globber)) log.info('Found: {} pdfs'.format(len(pdffiles))) pool = Pool(processes=processes) result = pool.map(partial(convert_safe, timelimit=timelimit), pdffiles) pool.close() pool.join() def convert_safe(pdffile: str, timelimit: int = TIMELIMIT): """ Conversion function that never fails """ try: convert(pdffile, timelimit=timelimit) except Exception as e: log.error('File conversion failed for {}: {}'.format(pdffile, e)) def convert(path: str, skipconverted=True, timelimit: int = TIMELIMIT) -> str: """ Convert a single PDF to text. Parameters ---------- path : str Location of a PDF file. skipconverted : boolean Skip conversion when there is a text file already Returns ------- str Location of text file. """ if not os.path.exists(path): raise RuntimeError('No such path: %s' % path) outpath = reextension(path, 'txt') if os.path.exists(outpath): return outpath try: content = fulltext(path, timelimit) with open(outpath, 'w') as f: f.write(content) except Exception as e: msg = "Conversion failed for '%s': %s" log.error(msg, path, e) raise RuntimeError(msg % (path, e)) from e return outpath