""" s3_bulk_download.py authors: Matt Bierbaum and Colin Clement date: 2019-02-27 This module uses AWS to request a signed key url, which requests files from the ArXiv S3 bucket. It then unpacks and converts the pdfs into text. Note that at the time of writing the ArXiv manifest, it contains 1.15 TB of PDFs, which would cost $103 to receive from AWS S3. see: https://arxiv.org/help/bulk_data_s3 Usage ----- Set DIR_FULLTEXT as the directory where the text parsed from pdfs should be placed. Set DIR_PDFTARS as the directory where the raw pdf tars should be placed. ``` import arxiv_public_data.s3_bulk_download as s3 # Download manifest file (or load if already downloaded) >>> manifest = s3.get_manifest() # Download tar files and convert pdf to text # Costs money! Will only download if it does not find files >>> s3.process_manifest_files(manifest) # If you just want to download the PDFs and not convert to text use >>> s3.download_check_tarfiles(manifest) ``` """ import os import re import gzip import json import glob import shlex import shutil import tarfile import boto3 import hashlib import requests import subprocess from functools import partial from multiprocessing import Pool from collections import defaultdict import xml.etree.ElementTree as ET from arxiv_public_data import fulltext from arxiv_public_data.config import DIR_FULLTEXT, DIR_PDFTARS, LOGGER logger = LOGGER.getChild('s3') CHUNK_SIZE = 2**20 # 1MB BUCKET_NAME = 'arxiv' S3_PDF_MANIFEST = 'pdf/arXiv_pdf_manifest.xml' S3_TEX_MANIFEST = 'src/arXiv_src_manifest.xml' HEADERS = {'x-amz-request-payer': 'requester'} s3 = boto3.client('s3', region_name='us-east-1') def download_file(filename, outfile, chunk_size=CHUNK_SIZE, redownload=False, dryrun=False): """ Downloads filename from the ArXiv AWS S3 bucket, and returns streaming md5 sum of the content Parameters ---------- filename : str KEY corresponding to AWS bucket file outfile : stf name and path of local file in which downloaded file will be stored (optional) chunk_size : int requests byte streaming size (so 500MB are not stored in memory prior to processing) redownload : bool Look to see if file is already downloaded, and simply return md5sum if it it exists, unless redownload is True dryrun : bool If True, only log activity Returns ------- md5sum : str md5 checksum of the contents of filename """ if os.path.exists(outfile) and not redownload: md5 = hashlib.md5() md5.update(gzip.open(outfile, 'rb').read()) return md5.hexdigest() md5 = hashlib.md5() url = s3.generate_presigned_url( "get_object", Params={ "Bucket": BUCKET_NAME, "Key": filename, "RequestPayer": 'requester' } ) if not dryrun: logger.info('Requesting "{}" (costs money!)'.format(filename)) request = requests.get(url, stream=True) response_iter = request.iter_content(chunk_size=chunk_size) logger.info("\t Writing {}".format(outfile)) with gzip.open(outfile, 'wb') as fout: for i, chunk in enumerate(response_iter): fout.write(chunk) md5.update(chunk) else: logger.info('Requesting "{}" (free!)'.format(filename)) logger.info("\t Writing {}".format(outfile)) return md5.hexdigest() def default_manifest_filename(): return os.path.join(DIR_PDFTARS, 'arxiv-manifest.xml.gz') def get_manifest(filename=None, redownload=False): """ Get the file manifest for the ArXiv Parameters ---------- redownload : bool If true, forces redownload of manifest even if it exists Returns ------- file_information : list of dicts each dict contains the file metadata """ manifest_file = filename or default_manifest_filename() md5 = download_file( S3_PDF_MANIFEST, manifest_file, redownload=redownload, dryrun=False ) manifest = gzip.open(manifest_file, 'rb').read() return parse_manifest(manifest) def parse_manifest(manifest): """ Parse the XML of the ArXiv manifest file. Parameters ---------- manifest : str xml string from the ArXiv manifest file Returns ------- file_information : list of dicts One dict for each file, containing the filename, size, md5sum, and other metadata """ root = ET.fromstring(manifest) return [ {c.tag: f.find(c.tag).text for c in f.getchildren()} for f in root.findall('file') ] def _tar_to_filename(filename): return os.path.join(DIR_PDFTARS, os.path.basename(filename)) + '.gz' def download_check_tarfile(filename, md5_expected, dryrun=False, redownload=False): """ Download filename, check its md5sum, and form the output path """ outname = _tar_to_filename(filename) md5_downloaded = download_file( filename, outname, dryrun=dryrun, redownload=redownload ) if not dryrun: if md5_expected != md5_downloaded: msg = "MD5 '{}' does not match expected '{}' for file '{}'".format( md5_downloaded, md5_expected, filename ) raise AssertionError(msg) return outname def download_check_tarfiles(list_of_fileinfo, dryrun=False): """ Download tar files from the ArXiv manifest and check that their MD5sums match Parameters ---------- list_of_fileinfo : list Some elements of results of get_manifest (optional) dryrun : bool If True, only log activity """ for fileinfo in list_of_fileinfo: download_check_tarfile(fileinfo['filename'], fileinfo['md5sum'], dryrun=dryrun) def call(cmd, dryrun=False, debug=False): """ Spawn a subprocess and execute the string in cmd """ if dryrun: logger.info(cmd) return 0 else: return subprocess.check_call( shlex.split(cmd), stderr=None if debug else open(os.devnull, 'w') ) def _make_pathname(filename): """ Make filename path for text document, sorted like on arXiv servers. Parameters ---------- filename : str string filename of arXiv article (optional) Returns ------- pathname : str pathname in which to store the article following * Old ArXiv IDs: e.g. hep-ph0001001.txt returns DIR_PDFTARS/hep-ph/0001/hep-ph0001001.txt * New ArXiv IDs: e.g. 1501.13851.txt returns DIR_PDFTARS/arxiv/1501/1501.13851.txt """ basename = os.path.basename(filename) fname = os.path.splitext(basename)[0] if '.' in fname: # new style ArXiv ID yearmonth = fname.split('.')[0] return os.path.join(DIR_FULLTEXT, 'arxiv', yearmonth, basename) # old style ArXiv ID cat, aid = re.split(r'(\d+)', fname)[:2] yearmonth = aid[:4] return os.path.join(DIR_FULLTEXT, cat, yearmonth, basename) def process_tarfile_inner(filename, pdfnames=None, processes=1, dryrun=False, timelimit=fulltext.TIMELIMIT): outname = _tar_to_filename(filename) if not os.path.exists(outname): msg = 'Tarfile from manifest not found {}, skipping...'.format(outname) logger.error(msg) return # unpack tar file if pdfnames: namelist = ' '.join(pdfnames) cmd = 'tar --one-top-level -C {} -xf {} {}' cmd = cmd.format(DIR_PDFTARS, outname, namelist) else: cmd = 'tar --one-top-level -C {} -xf {}'.format(DIR_PDFTARS, outname) _call(cmd, dryrun) basename = os.path.splitext(os.path.basename(filename))[0] pdfdir = os.path.join(DIR_PDFTARS, basename, basename.split('_')[2]) # Run fulltext to convert pdfs in tardir into *.txt converts = fulltext.convert_directory_parallel( pdfdir, processes=processes, timelimit=timelimit ) # move txt into final file structure txtfiles = glob.glob('{}/*.txt'.format(pdfdir)) for tf in txtfiles: mvfn = _make_pathname(tf) dirname = os.path.dirname(mvfn) if not os.path.exists(dirname): _call('mkdir -p {}'.format(dirname), dryrun) if not dryrun: shutil.move(tf, mvfn) # clean up pdfs _call('rm -rf {}'.format(os.path.join(DIR_PDFTARS, basename)), dryrun) def process_tarfile(fileinfo, pdfnames=None, dryrun=False, debug=False, processes=1): """ Download and process one of the tar files from the ArXiv manifest. Download, unpack, and spawn the Docker image for converting pdf2text. It will only try to download the file if it does not already exist. The tar file will be stored in DIR_FULLTEXT/ and the resulting arXiv articles will be stored in the subdirectory DIR_FULLTEXT/arxiv//.txt for old style arXiv IDs and DIR_FULLTEXT///.txt for new style arXiv IDs. Parameters ---------- fileinfo : dict dictionary of file information from parse_manifest (optional) dryrun : bool If True, only log activity debug : bool Silence stderr of Docker _call if debug is False """ filename = fileinfo['filename'] md5sum = fileinfo['md5sum'] if check_if_any_processed(fileinfo): logger.info('Tar file appears processed, skipping {}...'.format(filename)) return logger.info('Processing tar "{}" ...'.format(filename)) process_tarfile_inner(filename, pdfnames=None, processes=processes, dryrun=dryrun) def process_manifest_files(list_of_fileinfo, processes=1, dryrun=False): """ Download PDFs from the ArXiv AWS S3 bucket and convert each pdf to text Parameters. If files are already downloaded, it will only process them. ---------- list_of_fileinfo : list Some elements of results of get_manifest (optional) processes : int number of paralell workers to spawn (roughly as many CPUs as you have) dryrun : bool If True, only log activity """ for fileinfo in list_of_fileinfo: process_tarfile(fileinfo, dryrun=dryrun, processes=processes) def check_if_any_processed(fileinfo): """ Spot check a tarfile to see if the pdfs have been converted to text, given an element of the s3 manifest """ first = _make_pathname(fileinfo['first_item']+'.txt') last = _make_pathname(fileinfo['last_item']+'.txt') return os.path.exists(first) and os.path.exists(last) def generate_tarfile_indices(manifest): """ Go through the manifest and for every tarfile, get a list of the PDFs that should be contained within it. This is a separate function because even checking the tars is rather slow. Returns ------- index : dictionary keys: tarfile, values: list of pdfs """ index = {} for fileinfo in manifest: name = fileinfo['filename'] logger.info("Indexing {}...".format(name)) tarname = os.path.join(DIR_PDFTARS, os.path.basename(name))+'.gz' files = [i for i in tarfile.open(tarname).getnames() if i.endswith('.pdf')] index[name] = files return index def check_missing_txt_files(index): """ Use the index file from `generate_tarfile_indices` to check which pdf->txt conversions are outstanding. """ missing = defaultdict(list) for tar, pdflist in index.items(): logger.info("Checking {}...".format(tar)) for pdf in pdflist: txt = _make_pathname(pdf).replace('.pdf', '.txt') if not os.path.exists(txt): missing[tar].append(pdf) return missing def rerun_missing(missing, processes=1): """ Use the output of `check_missing_txt_files` to attempt to rerun the text files which are missing from the conversion. There are various reasons that they can fail. """ sort = list(reversed( sorted([(k, v) for k, v in missing.items()], key=lambda x: len(x[1])) )) for tar, names in sort: logger.info("Running {} ({} to do)...".format(tar, len(names))) process_tarfile_inner( tar, pdfnames=names, processes=processes, timelimit=5 * fulltext.TIMELIMIT ) def process_missing(manifest, processes=1): """ Do the full process of figuring what is missing and running them """ indexfile = os.path.join(DIR_PDFTARS, 'manifest-index.json') if not os.path.exists(indexfile): index = generate_tarfile_indices(manifest) json.dump(index, open(indexfile, 'w')) index = json.load(open(indexfile)) missing = check_missing_txt_files(index) rerun_missing(missing, processes=processes)