import os import re import itertools import bs4 from bs4 import BeautifulSoup, NavigableString from typing import List, Dict, Tuple, Optional import copy import latex2mathml.converter from doc2json.grobid2json.grobid.grobid_client import GrobidClient from doc2json.utils.grobid_util import parse_bib_entry, get_author_data_from_grobid_xml from doc2json.s2orc import Paper, Paragraph SKIP_TAGS = { 'clearpage', 'colorpool', 'newpage', 'tableofcontents' } TEXT_TAGS = { 'p', 'proof', 'caption' } def normalize_latex_id(latex_id: str): str_norm = latex_id.upper().replace('_', '') if str_norm.startswith('BID'): return str_norm.replace('BID', 'BIBREF') if str_norm.startswith('CID'): return str_norm.replace('CID', 'SECREF') if str_norm.startswith('FORMULA'): return str_norm.replace('FORMULA', 'EQREF') return str_norm def process_author( author_text: str, grobid_client: GrobidClient, logfile: str ) -> List[Dict]: """ Process authors :param author_text: :param grobid_client: :param logfile: :return: """ if author_text: author_xml_str = grobid_client.process_header_names(author_text, logfile) if author_xml_str: author_soup = BeautifulSoup(author_xml_str, 'xml') author_entry = get_author_data_from_grobid_xml(author_soup) return author_entry return [{ "first": "", "middle": [], "last": author_text, "suffix": "", "affiliation": {}, "email": "" }] def process_bibentry(bib_text: str, grobid_client: GrobidClient, logfile: str): """ Process one bib entry text into title, authors, etc :param bib_text: :param grobid_client: :param logfile: :return: """ if not bib_text: return None bib_lines = bib_text.split('\n') bib_lines = [re.sub(r'\s+', ' ', line) for line in bib_lines] bib_lines = [re.sub(r'\s', ' ', line).strip() for line in bib_lines] bib_string = ' '.join(bib_lines) xml_str = grobid_client.process_citation(bib_string, logfile) if xml_str: soup = BeautifulSoup(xml_str, 'lxml') bib_entry = parse_bib_entry(soup) if not bib_entry['raw_text']: bib_entry['raw_text'] = bib_string return bib_entry return None def replace_ref_tokens(sp: BeautifulSoup, el: bs4.element.Tag, ref_map: Dict): """ Replace all references in element with special tokens :param sp: :param el: :param ref_map: :return: """ # replace all citations with cite keyword for cite in el.find_all('cit'): try: target = cite.ref.get('target').replace('bid', 'BIBREF') cite.replace_with(sp.new_string(f" {target} ")) except AttributeError: print('Attribute error: ', cite) continue # replace all non citation references for rtag in el.find_all('ref'): try: if rtag.get('target') and not rtag.get('target').startswith('bid'): if rtag.get('target').startswith('cid'): target = rtag.get('target').replace('cid', 'SECREF') elif rtag.get('target').startswith('uid'): if rtag.get('target').replace('uid', 'FIGREF') in ref_map: target = rtag.get('target').replace('uid', 'FIGREF') elif rtag.get('target').replace('uid', 'TABREF') in ref_map: target = rtag.get('target').replace('uid', 'TABREF') elif rtag.get('target').replace('uid', 'EQREF') in ref_map: target = rtag.get('target').replace('uid', 'EQREF') elif rtag.get('target').replace('uid', 'FOOTREF') in ref_map: target = rtag.get('target').replace('uid', 'FOOTREF') elif rtag.get('target').replace('uid', 'SECREFU') in ref_map: target = rtag.get('target').replace('uid', 'SECREFU') else: target = rtag.get('target').upper() else: print('Weird ID!') target = rtag.get('target').upper() rtag.replace_with(sp.new_string(f" {target} ")) except AttributeError: print('Attribute error: ', rtag) continue return el def process_list_el(sp: BeautifulSoup, list_el: bs4.element.Tag, section_info: List, bib_map: Dict, ref_map: Dict): """ Process list element :param sp: :param list_el: :param section_info: :param bib_map: :param ref_map: :return: """ # TODO: currently parsing list as a list of paragraphs (append numbers to start of each entry in ordered lists) list_items = [] for item in list_el.find_all('item'): # skip itemize settings if item.text.strip().startswith('[') and item.text.strip().endswith(']'): continue # try processing as paragraph list_num = item.get('id-text', None) item_as_para = process_paragraph(sp, item, section_info, bib_map, ref_map) # append list number if ordered if list_num: list_num_str = f'{list_num}. ' # iterate cite spans new_cite_spans = [] for span in item_as_para.cite_spans: new_cite_spans.append({ "start": span['start'] + len(list_num_str), "end": span['end'] + len(list_num_str), "text": span['text'] }) # iterate ref spans new_ref_spans = [] for span in item_as_para.ref_spans: new_ref_spans.append({ "start": span['start'] + len(list_num_str), "end": span['end'] + len(list_num_str), "text": span['text'] }) # iterate equation spans new_eq_spans = [] for span in item_as_para.eq_spans: new_eq_spans.append({ "start": span['start'] + len(list_num_str), "end": span['end'] + len(list_num_str), "text": span['text'], "latex": span['latex'], "ref_id": span['ref_id'] }) new_para = Paragraph( text=list_num_str + item_as_para.text, cite_spans=new_cite_spans, ref_spans=new_ref_spans, eq_spans=new_eq_spans, section=item_as_para.section ) else: new_para = item_as_para list_items.append(new_para) return list_items def process_navstring(str_el: NavigableString, section_info: List): """ Process one NavigableString :param sp: :param str_el: :param section_info: :param bib_map: :param ref_map: :return: """ # substitute space characters text = re.sub(r'\s+', ' ', str_el) text = re.sub(r'\s', ' ', text) # get all cite spans all_cite_spans = [] for span in re.finditer(r'(BIBREF\d+)', text): all_cite_spans.append({ "start": span.start(), "end": span.start() + len(span.group()), "ref_id": span.group() }) # get all ref spans all_ref_spans = [] for span in itertools.chain( re.finditer(r'(FIGREF\d+)', text), re.finditer(r'(TABREF\d+)', text), re.finditer(r'(EQREF\d+)', text), re.finditer(r'(FOOTREF\d+)', text), re.finditer(r'(SECREF\d+)', text), re.finditer(r'(SECREFU\d+)', text), ): all_ref_spans.append({ "start": span.start(), "end": span.start() + len(span.group()), "ref_id": span.group() }) # assert all align for cite_span in all_cite_spans: assert text[cite_span['start']:cite_span['end']] == cite_span['ref_id'] for ref_span in all_ref_spans: assert text[ref_span['start']:ref_span['end']] == ref_span['ref_id'] return Paragraph( text=text, cite_spans=all_cite_spans, ref_spans=all_ref_spans, eq_spans=[], section=section_info ) def process_paragraph(sp: BeautifulSoup, para_el: bs4.element.Tag, section_info: List, bib_map: Dict, ref_map: Dict): """ Process one paragraph :param sp: :param para_el: :param section_info: :param bib_map: :param ref_map: :return: """ # replace all ref tokens with special tokens para_el = replace_ref_tokens(sp, para_el, ref_map) # sub and get corresponding spans of inline formulas formula_dict = dict() inline_key_ind = 0 display_key_ind = 0 for ftag in para_el.find_all('formula'): try: # if formula has ref id, treat as display formula if ftag.get('id'): formula_key = f'DISPLAYFORM{display_key_ind}' ref_id = ftag.get('id').replace('uid', 'EQREF') display_key_ind += 1 # else, treat as inline else: formula_key = f'INLINEFORM{inline_key_ind}' ref_id = None inline_key_ind += 1 try: formula_mathml = latex2mathml.converter.convert(ftag.texmath.text) except Exception: formula_mathml = "" formula_dict[formula_key] = (ftag.math.text, ftag.texmath.text, formula_mathml, ref_id) ftag.replace_with(sp.new_string(f" {formula_key} ")) except AttributeError: continue # remove floats for fl in para_el.find_all('float'): print('Warning: still has !') fl.decompose() # remove notes for note in para_el.find_all('note'): print('Warning: still has !') note.decompose() # substitute space characters text = re.sub(r'\s+', ' ', para_el.text) text = re.sub(r'\s', ' ', text) # get all cite spans all_cite_spans = [] for span in re.finditer(r'(BIBREF\d+)', text): all_cite_spans.append({ "start": span.start(), "end": span.start() + len(span.group()), "text": bib_map[span.group()]['num'] if span.group() in bib_map else None, "ref_id": span.group() }) # get all ref spans all_ref_spans = [] for span in itertools.chain( re.finditer(r'(FIGREF\d+)', text), re.finditer(r'(TABREF\d+)', text), re.finditer(r'(EQREF\d+)', text), re.finditer(r'(FOOTREF\d+)', text), re.finditer(r'(SECREF\d+)', text), re.finditer(r'(SECREFU\d+)', text), ): all_ref_spans.append({ "start": span.start(), "end": span.start() + len(span.group()), "text": ref_map[span.group()]['num'] if span.group() in ref_map else None, "ref_id": span.group() }) # get all equation spans all_eq_spans = [] for span in itertools.chain( re.finditer(r'(INLINEFORM\d+)', text), re.finditer(r'(DISPLAYFORM\d+)', text) ): try: matching_formula = formula_dict[span.group()] all_eq_spans.append({ "start": span.start(), "end": span.start() + len(span.group()), "text": matching_formula[0], "latex": matching_formula[1], "mathml": matching_formula[2], "ref_id": span.group() }) except KeyError: continue # assert all align for cite_span in all_cite_spans: assert text[cite_span['start']:cite_span['end']] == cite_span['ref_id'] for ref_span in all_ref_spans: assert text[ref_span['start']:ref_span['end']] == ref_span['ref_id'] return Paragraph( text=text, cite_spans=all_cite_spans, ref_spans=all_ref_spans, eq_spans=all_eq_spans, section=section_info ) def decompose_tags_before_title(sp: BeautifulSoup): """ decompose all tags before title :param sp: :return: """ if sp.body.next.name == 'std': cld_tags = sp.std.find_all(recursive=False) if any([tag.name == 'maketitle' or tag.name == 'title' for tag in cld_tags]): for tag in sp.std: if type(tag) == bs4.element.Tag: if tag.name != 'maketitle' and tag.name != 'title': tag.decompose() else: break elif sp.body.next.name == 'unknown': cld_tags = sp.unknown.find_all(recursive=False) if any([tag.name == 'maketitle' or tag.name == 'title' for tag in cld_tags]): for tag in sp.std: if type(tag) == bs4.element.Tag: if tag.name != 'maketitle' and tag.name != 'title': tag.decompose() else: break else: print(f"Unknown inner tag: {sp.body.next.name}") return def process_metadata(sp: BeautifulSoup, grobid_client: GrobidClient, log_file: str) -> Tuple[str, List]: """ Process metadata section in soup :param sp: :param grobid_client: :param log_file: :return: """ title = "" authors = [] if not sp.maketitle and not sp.metadata: if sp.title: title = sp.title.text return title, authors else: return title, authors elif sp.maketitle: try: # process title title = sp.maketitle.title.text for formula in sp.author.find_all('formula'): formula.decompose() # process authors author_parts = [] for tag in sp.author: if type(tag) == NavigableString: author_parts.append(tag.strip()) else: author_parts.append(tag.text.strip()) author_parts = [re.sub(r'\s+', ' ', line) for line in author_parts] author_parts = [re.sub(r'\s', ' ', line).strip() for line in author_parts] author_parts = [part for part in author_parts if part.strip()] author_string = ', '.join(author_parts) authors = process_author(author_string, grobid_client, log_file) sp.maketitle.decompose() except AttributeError: sp.maketitle.decompose() return title, authors elif sp.metadata: try: # process title and authors from metadata title = sp.metadata.title.text # get authors for author in sp.authors: for subtag in author: subtag.decompose() if author.text.strip(): author_parts = author.text.strip().split() authors.append({ "first": author_parts[0] if len(author_parts) > 1 else "", "last": author_parts[-1] if author_parts[-1].lower() not in {"jr", "jr.", "iii", "iv", "v"} else author_parts[-2] if len(author_parts) > 1 else author_parts[-1], "middle": author_parts[1:-1], "suffix": "", "affiliation": {}, "email": "" }) sp.metadata.decompose() except AttributeError: sp.metadata.decompose() return title, authors return title, authors def process_bibliography_from_tex(sp: BeautifulSoup, client, log_file) -> Dict: """ Parse bibliography from latex :return: """ bibkey_map = dict() # replace Bibliography with bibliography if needed for bibl in sp.find_all("Bibliography"): bibl.name = 'bibliography' # construct bib map for bibliography in sp.find_all('bibliography'): bib_items = bibliography.find_all('bibitem') # map all bib entries if bib_items: for bi_num, bi in enumerate(bib_items): try: if not bi.get('id'): continue # get bib entry text and process it bib_par = bi.find_parent('p') if bib_par.text: bib_entry = process_bibentry(bib_par.text, client, log_file) else: next_tag = bib_par.findNext('p') if not next_tag.find('bibitem') and next_tag.text: bib_entry = process_bibentry(next_tag.text, client, log_file) else: bib_entry = None # if processed successfully, add to map if bib_entry: # get URLs from bib entry urls = [] for xref in bib_par.find_all('xref'): urls.append(xref.get('url')) bib_entry['urls'] = urls # map to ref id ref_id = normalize_latex_id(bi.get('id')) bib_entry['ref_id'] = ref_id bib_entry['num'] = bi_num bibkey_map[ref_id] = bib_entry except AttributeError: print('Attribute error in bib item!', bi) continue except TypeError: print('Type error in bib item!', bi) continue else: for bi_num, p in enumerate(sp.bibliography.find_all('p')): try: bib_key, bib_entry = None, None bib_text = p.text bib_name = re.match(r'\[(.*?)\](.*)', bib_text) if bib_name: bib_text = re.sub(r'\s', ' ', bib_text) bib_name = re.match(r'\[(.*?)\](.*)', bib_text) if bib_name: bib_key = bib_name.group(1) bib_entry = process_bibentry(bib_name.group(2), client, log_file) else: bib_lines = bib_text.split('\n') bib_key = re.sub(r'\s', ' ', bib_lines[0]) bib_text = re.sub(r'\s', ' ', ' '.join(bib_lines[1:])) bib_entry = process_bibentry(bib_text, client, log_file) if bib_key and bib_entry: # get URLs from bib entry urls = [] for xref in p.find_all('xref'): urls.append(xref.get('url')) bib_entry['urls'] = urls bib_entry['num'] = bi_num # map to bib id bibkey_map[bib_key] = bib_entry except AttributeError: print('Attribute error in bib item!', p) continue except TypeError: print('Type error in bib item!', p) continue for bibliography in sp.find_all('bibliography'): bibliography.decompose() return bibkey_map def get_section_name(sec): """ Get section name from div tag :param sec: :return: """ if sec.head: sec_text = sec.head.text else: sec_str = [] for tag in sec: if type(tag) == NavigableString: if len(tag.strip()) < 50: sec_str.append(tag.strip()) else: break elif tag.name != 'p': if len(tag.text.strip()) < 50: sec_str.append(tag.text.strip()) else: break else: break sec_text = ' '.join(sec_str).strip() return sec_text def get_sections_from_div(el: bs4.element.Tag, sp: BeautifulSoup, parent: Optional[str], faux_max: int) -> Dict: """ Process section headers for one div :param el: :param sp: :return: """ sec_map_dict = dict() el_ref_id = None # process divs with ids if el.get('id', None): sec_num = el.get('id-text', None) if 'cid' in el.get('id'): el_ref_id = el.get('id').replace('cid', 'SECREF') elif 'uid' in el.get('id'): el_ref_id = el.get('id').replace('uid', 'SECREFU') else: print('Unknown ID type!', el.get('id')) raise NotImplementedError el['s2orc_id'] = el_ref_id sec_map_dict[el_ref_id] = { "num": sec_num, "text": get_section_name(el), "ref_id": el_ref_id, "parent": parent } # process divs without section numbers elif el.get('rend') == "nonumber": el_ref_id = f'SECREF{faux_max}' el['s2orc_id'] = el_ref_id sec_map_dict[el_ref_id] = { "num": None, "text": get_section_name(el), "ref_id": el_ref_id, "parent": parent } # process sub elements for sub_el in el.find_all(recursive=False): if sub_el.name.startswith('div'): # add any unspecified keys sec_keys = [int(k.strip('SECREF')) for k in sec_map_dict.keys() if k and k.strip('SECREF').isdigit()] faux_max = max(sec_keys + [faux_max]) + 1 sec_map_dict.update( get_sections_from_div(sub_el, sp, el_ref_id if el_ref_id else parent, faux_max) ) elif sub_el.name == 'p' or sub_el.name == 'proof': if sub_el.get('id', None): sec_num = sub_el.get('id-text', sub_el.hi.get('id-text', None)) if 'cid' in sub_el.get('id'): sub_el_ref_id = sub_el.get('id').replace('cid', 'SECREF') elif 'uid' in sub_el.get('id'): sub_el_ref_id = sub_el.get('id').replace('uid', 'SECREFU') else: print('Unknown ID type!', sub_el.get('id')) raise NotImplementedError sub_el['s2orc_id'] = sub_el_ref_id sec_map_dict[el_ref_id] = { "num": sec_num, "text": sub_el.head.text if sub_el.head else sub_el.hi.text if sub_el.hi else "", "ref_id": sub_el_ref_id, "parent": el_ref_id if el_ref_id else parent } return sec_map_dict def process_sections_from_text(sp: BeautifulSoup) -> Dict: """ Generate section dict and replace with id tokens :param sp: :return: """ # initialize section_map = dict() max_above_1000 = 999 for div0 in sp.find_all('div0'): parent = None section_map.update(get_sections_from_div(div0, sp, parent, max_above_1000 + 1)) # add any unspecified keys sec_keys = [int(k.strip('SECREF')) for k in section_map.keys() if k and k.strip('SECREF').isdigit()] max_above_1000 = max(sec_keys + [max_above_1000]) + 1 return section_map def process_equations_from_tex(sp: BeautifulSoup) -> Dict: """ Generate equation dict and replace with id tokens :param sp: :return: """ equation_map = dict() for eq in sp.find_all('formula'): try: if eq.get('type', None) == 'display': if eq.get('id', None): ref_id = eq.get('id').replace('uid', 'EQREF') try: mathml = latex2mathml.converter.convert(eq.texmath.text.strip()) except Exception: mathml = "" equation_map[ref_id] = { "num": eq.get('id-text', None), "text": eq.math.text.strip(), "mathml": mathml, "latex": eq.texmath.text.strip(), "ref_id": ref_id } replace_item = sp.new_tag('p') equation_copy = copy.copy(eq) equation_copy['type'] = 'inline' replace_item.insert(0, equation_copy) # replace with

containing equation as inline eq.replace_with(replace_item) except AttributeError: continue return equation_map def process_footnotes_from_text(sp: BeautifulSoup) -> Dict: """ Process footnote marks :param sp: :return: """ footnote_map = dict() for note in sp.find_all('note'): try: if note.name and note.get('id'): # normalize footnote id ref_id = note.get('id').replace('uid', 'FOOTREF') # remove equation tex for eq in note.find_all('texmath'): eq.decompose() # replace all xrefs with link for xref in note.find_all('xref'): xref.replace_with(sp.new_string(f" {xref.get('url')} ")) # clean footnote text footnote_text = None if note.text: footnote_text = note.text.strip() footnote_text = re.sub(r'\s+', ' ', footnote_text) footnote_text = re.sub(r'\s', ' ', footnote_text) # form footnote entry footnote_map[ref_id] = { "num": note.get('id-text', None), "text": footnote_text, "ref_id": ref_id } note.replace_with(sp.new_string(f" {ref_id} ")) except AttributeError: continue return footnote_map def get_figure_map_from_tex(sp: BeautifulSoup) -> Dict: """ Generate figure dict only :param sp: :return: """ figure_map = dict() # get floats first because they are around figures for flt in sp.find_all('float'): try: if flt.name and flt.get('name') == 'figure': # get files fig_files = [] for fig in flt.find_all('figure'): if fig.get('file') and fig.get('extension'): fname = fig.get('file') + '.' + fig.get('extension') fig_files.append(fname) elif fig.get('file'): fname = fig.get('file') fig_files.append(fname) else: for subfig in fig.find_all('subfigure'): if subfig.get('file') and subfig.get('extension'): fig_files.append(subfig.get('file') + '.' + subfig.get('extension')) elif subfig.get('file'): fig_files.append(subfig.get('file')) if flt.get('id'): ref_id = flt.get('id').replace('uid', 'FIGREF') # form figmap entry figure_map[ref_id] = { "num": flt.get('id-text', None), "text": None, # placeholder "uris": fig_files, "ref_id": ref_id } except AttributeError: print('Attribute error with figure float: ', flt.name) continue for fig in sp.find_all('figure'): try: if fig.name and fig.get('id'): # normalize figure id ref_id = fig.get('id').replace('uid', 'FIGREF') # try to get filenames of figures fig_files = [] if fig.get('file') and fig.get('extension'): fname = fig.get('file') + '.' + fig.get('extension') fig_files.append(fname) elif fig.get('file'): fig_files.append(fig.get('file')) else: for subfig in fig.find_all('subfigure'): if subfig.get('file') and subfig.get('extension'): fig_files.append(subfig.get('file') + '.' + subfig.get('extension')) elif subfig.get('file'): fig_files.append(subfig.get('file')) # form figmap entry figure_map[ref_id] = { "num": fig.get('id-text', None), "text": None, # placeholder "uris": fig_files, "ref_id": ref_id } except AttributeError: print('Attribute error with figure: ', fig.name) continue return figure_map def process_figures_from_tex(sp: BeautifulSoup, ref_map: Dict) -> Dict: """ Add figure captions to fig_map and decompose :param sp: :param ref_map: :return: """ # process floats first because they are on the outside for flt in sp.find_all('float'): try: if flt.name and flt.get('name') == 'figure': if flt.get('id'): ref_id = flt.get('id').replace('uid', 'FIGREF') # remove equation tex for eq in flt.find_all('texmath'): eq.decompose() # clean caption text caption_text = None if flt.caption: flt = replace_ref_tokens(sp, flt, ref_map) caption_text = flt.caption.text.strip() caption_text = re.sub(r'\s+', ' ', caption_text) caption_text = re.sub(r'\s', ' ', caption_text) # form figmap entry ref_map[ref_id]['text'] = caption_text flt.decompose() except AttributeError: print('Attribute error with figure float: ', flt.name) continue for fig in sp.find_all('figure'): try: if fig.name and fig.get('id'): # normalize figure id ref_id = fig.get('id').replace('uid', 'FIGREF') # remove equation tex for eq in fig.find_all('texmath'): eq.decompose() # clean caption text caption_text = None if fig.text: fig = replace_ref_tokens(sp, fig, ref_map) caption_text = fig.text.strip() caption_text = re.sub(r'\s+', ' ', caption_text) caption_text = re.sub(r'\s', ' ', caption_text) # add text to figmap entry ref_map[ref_id]["text"] = caption_text except AttributeError: print('Attribute error with figure: ', fig.name) continue fig.decompose() return ref_map def convert_table_to_html(table_lst: List) -> str: if not table_lst: return '' html_str = '' for i, row in enumerate(table_lst): html_str += '' bottom_border = row.get('bottom-border') if i == 0 or bottom_border: for cell in row['cells']: html_str += f"" else: for cell in row['cells']: html_str += f"" html_str += '' html_str += '
{cell['text']}{cell['text']}
' return html_str def extract_table(table: BeautifulSoup) -> List: """ Extract table values from table entry :param table: :return: """ table_rep = [] for row in table.find_all('row'): cells = [] for cell in row.find_all('cell'): text_items = [] latex_items = [] for child in cell: if type(child) == NavigableString: text_items.append(str(child)) latex_items.append(str(child)) elif child.name == 'formula': text_items.append(child.math.text) latex_items.append(child.texmath.text) else: text_items.append(child.text) latex_items.append(child.text) text = ' '.join(text_items) text = re.sub(r'\s+', ' ', text) text = re.sub(r'\s', ' ', text) latex = ' '.join(latex_items) latex = re.sub(r'\s+', ' ', latex) cells.append({ "alignment": cell.get('halign'), "right-border": cell.get('right-border') == 'true', "left-border": cell.get('left-border') == 'true', "text": text.strip(), "latex": latex.strip() }) table_rep.append({ "top-border": row.get('top-border') == "true", "bottom-border": row.get('bottom-border') == "true", "cells": cells }) return table_rep def get_table_map_from_text(sp: BeautifulSoup, keep_table_contents=True) -> Dict: """ Generate table dict only :param sp: :param keep_table_contents: :return: """ table_map = dict() for flt in sp.find_all('float'): try: if flt.name and flt.get('name') == 'table': if flt.get('id'): # normalize table id ref_id = flt.get('id').replace('uid', 'TABREF') # get table content content = extract_table(flt) if keep_table_contents else None html = convert_table_to_html(content) if keep_table_contents else None # form tabmap entry table_map[ref_id] = { "num": flt.get('id-text', None), "text": None, # placeholder "content": content, "html": html, "ref_id": ref_id } for row in flt.find_all('row'): row.decompose() except AttributeError: print('Attribute error with table float: ', flt.name) continue for tab in sp.find_all('table'): try: # skip inline tables if tab.get('rend') == 'inline': continue # process them if tab.name and tab.get('id'): # normalize table id ref_id = tab.get('id').replace('uid', 'TABREF') # get table content content = extract_table(tab) if keep_table_contents else None html = convert_table_to_html(content) if keep_table_contents else None # form tabmap entry table_map[ref_id] = { "num": tab.get('id-text', None), "text": None, # placeholder "content": content, "html": html, "ref_id": ref_id } for row in tab.find_all('row'): row.decompose() except AttributeError: print('Attribute error with table: ', tab.name) continue return table_map def process_tables_from_tex(sp: BeautifulSoup, ref_map: Dict) -> Dict: """ Generate table dict and replace with id tokens :param sp: :param ref_map: :return: """ # process floats first because they are on the outside for flt in sp.find_all('float'): try: if flt.name and flt.get('name') == 'table': if flt.get('id'): # normalize table id ref_id = flt.get('id').replace('uid', 'TABREF') # remove equation tex if flt.caption: caption_el = replace_ref_tokens(sp, flt.caption, ref_map) for eq in caption_el.find_all('texmath'): eq.decompose() caption_text = caption_el.text.strip() elif flt.head: head_el = replace_ref_tokens(sp, flt.head, ref_map) for eq in head_el.find_all('texmath'): eq.decompose() caption_text = head_el.text.strip() elif flt.p: caption_parts = [] for tab_p in flt.find_all('p'): p_el = replace_ref_tokens(sp, tab_p, ref_map) for eq in p_el.find_all('texmath'): eq.decompose() caption_parts.append(p_el.text.strip()) caption_text = ' '.join(caption_parts) else: tab_el = replace_ref_tokens(sp, flt, ref_map) caption_text = tab_el.text.strip() if caption_text: caption_text = re.sub(r'\s+', ' ', caption_text) caption_text = re.sub(r'\s', ' ', caption_text) # form tabmap entry ref_map[ref_id]['text'] = caption_text flt.decompose() except AttributeError: print('Attribute error with table float: ', flt.name) continue for tab in sp.find_all('table'): try: # skip inline tables if tab.get('rend') == 'inline': continue # process them if tab.name and tab.get('id'): # normalize table id ref_id = tab.get('id').replace('uid', 'TABREF') # remove equation tex from caption and clean and resolve refs if tab.caption: caption_el = replace_ref_tokens(sp, tab.caption, ref_map) for eq in caption_el.find_all('texmath'): eq.decompose() caption_text = caption_el.text.strip() elif tab.head: head_el = replace_ref_tokens(sp, tab.head, ref_map) for eq in head_el.find_all('texmath'): eq.decompose() caption_text = head_el.text.strip() elif tab.p: caption_parts = [] for tab_p in tab.find_all('p'): p_el = replace_ref_tokens(sp, tab_p, ref_map) for eq in p_el.find_all('texmath'): eq.decompose() caption_parts.append(p_el.text.strip()) caption_text = ' '.join(caption_parts) else: tab_el = replace_ref_tokens(sp, tab, ref_map) caption_text = tab_el.text.strip() if caption_text: caption_text = re.sub(r'\s+', ' ', caption_text) caption_text = re.sub(r'\s', ' ', caption_text) # form tabmap entry ref_map[ref_id]['text'] = caption_text except AttributeError: print('Attribute error with table: ', tab.name) continue tab.decompose() return ref_map def combine_ref_maps(eq_map: Dict, fig_map: Dict, tab_map: Dict, foot_map: Dict, sec_map: Dict): """ Combine all items with ref ids into one map :param eq_map: :param fig_map: :param tab_map: :param sec_map: :return: """ ref_map = dict() for k, v in eq_map.items(): v['type'] = 'equation' ref_map[k] = v for k, v in fig_map.items(): v['type'] = 'figure' ref_map[k] = v for k, v in tab_map.items(): v['type'] = 'table' ref_map[k] = v for k, v in foot_map.items(): v['type'] = 'footnote' ref_map[k] = v for k, v in sec_map.items(): v['type'] = 'section' ref_map[k] = v return ref_map def collapse_formatting_tags(sp: BeautifulSoup): """ Collapse formatting tags like :param sp: :return: """ for hi in sp.find_all('hi'): hi.replace_with(f' {sp.new_string(hi.text.strip())} ') def process_abstract_from_tex(sp: BeautifulSoup, bib_map: Dict, ref_map: Dict) -> List[Dict]: """ Parse abstract from soup :param sp: :param bib_map: :param ref_map: :return: """ abstract_text = [] if sp.abstract: for p in sp.abstract.find_all('p'): abstract_text.append( process_paragraph(sp, p, [(None, "Abstract")], bib_map, ref_map) ) sp.abstract.decompose() else: if sp.std: p_tags = [tag for tag in sp.std if tag.name == 'p' and not tag.get('s2orc_id', None)] elif sp.unknown: p_tags = [tag for tag in sp.unknown if tag.name == 'p' and not tag.get('s2orc_id', None)] else: p_tags = None if p_tags: for p in p_tags: abstract_text.append( process_paragraph(sp, p, [(None, "Abstract")], bib_map, ref_map) ) p.decompose() return [para.__dict__ for para in abstract_text] def build_section_list(sec_id: str, ref_map: Dict) -> List[Tuple]: """ Build list of sections from reference map from sec_id using parent entry recursively :param sec_id: :param ref_map: :return: """ if not sec_id: return [] elif sec_id not in ref_map: return [] else: sec_entry = [(ref_map[sec_id]['num'], ref_map[sec_id]['text'])] if ref_map[sec_id]['parent'] == sec_id: return sec_entry else: return build_section_list(ref_map[sec_id]['parent'], ref_map) + sec_entry def get_seclist_for_el(el: bs4.element.Tag, ref_map: Dict, default_seclist: List) -> List[Tuple]: """ Build sec_list for tag :param el: :param ref_map: :param default_seclist: :return: """ if type(el) == NavigableString: return default_seclist sec_id = el.get('s2orc_id', None) if sec_id: return build_section_list(sec_id, ref_map) else: return default_seclist def process_div(tag: bs4.element.Tag, secs: List, sp: BeautifulSoup, bib_map: Dict, ref_map: Dict) -> List[Dict]: """ Process div recursively :param tag: :param secs: :param sp: :param bib_map: :param ref_map: :return: """ # iterate through children of this tag body_text = [] # navigable strings if type(tag) == NavigableString: return [] # skip these tags elif tag.name in SKIP_TAGS: return [] # process normal tags elif tag.name in TEXT_TAGS: if tag.text: body_text.append(process_paragraph(sp, tag, secs, bib_map, ref_map)) # process lists elif tag.name == 'list': if tag.text: body_text += process_list_el(sp, tag, secs, bib_map, ref_map) # process formula elif tag.name == 'formula': replace_item = sp.new_tag('p') tag_copy = copy.copy(tag) tag_copy['type'] = 'inline' replace_item.insert(0, tag_copy) tag.replace_with(replace_item) if tag.text: body_text.append(process_paragraph(sp, tag, secs, bib_map, ref_map)) # process divs elif tag.name.startswith('div'): for el in tag: # process tags if type(el) == bs4.element.Tag: el_sec_list = get_seclist_for_el(el, ref_map, secs) body_text += process_div(el, el_sec_list, sp, bib_map, ref_map) # unknown tag type, skip for now else: print(f'Unknown tag type: {tag.name}') return [] return body_text def process_body_text_from_tex(sp: BeautifulSoup, bib_map: Dict, ref_map: Dict) -> List[Dict]: """ Parse body text from tag recursively :param sp: :param bib_map: :param ref_map: :return: """ body_text = [] for tag in sp.body: # skip navigable string if type(tag) == NavigableString: continue else: sec_list = get_seclist_for_el(tag, ref_map, []) for cld in tag: # skip navigable string if type(tag) == NavigableString: continue else: sec_list = get_seclist_for_el(cld, ref_map, sec_list) if type(cld) == bs4.element.Tag: body_text += process_div(cld, sec_list, sp, bib_map, ref_map) # decompose everything sp.body.decompose() return [para.__dict__ for para in body_text] def convert_xml_to_s2orc( sp: BeautifulSoup, file_id: str, year_str: str, log_file: str, grobid_config: Optional[Dict]=None ) -> Paper: """ Convert a bunch of xml to gorc format :param sp: :param file_id: :param year_str: :param log_file: :param grobid_config: :return: """ # create grobid client client = GrobidClient(grobid_config) # TODO: not sure why but have to run twice decompose_tags_before_title(sp) decompose_tags_before_title(sp) # process maketitle info title, authors = process_metadata(sp, client, log_file) # processing of bibliography entries # TODO: look into why authors aren't processing bibkey_map = process_bibliography_from_tex(sp, client, log_file) # no bibliography entries if not bibkey_map: with open(log_file, 'a+') as bib_f: bib_f.write(f'{file_id},warn_no_bibs\n') # process section headers section_map = process_sections_from_text(sp) # process and replace non-inline equations equation_map = process_equations_from_tex(sp) # process footnote markers footnote_map = process_footnotes_from_text(sp) # get figure map figure_map = get_figure_map_from_tex(sp) # get table_map table_map = get_table_map_from_text(sp) # combine references in one dict refkey_map = combine_ref_maps(equation_map, figure_map, table_map, footnote_map, section_map) # process and replace figures refkey_map = process_figures_from_tex(sp, refkey_map) # process and replace tables refkey_map = process_tables_from_tex(sp, refkey_map) # collapse all hi tags collapse_formatting_tags(sp) # process abstract if possible abstract = process_abstract_from_tex(sp, bibkey_map, refkey_map) # process body text body_text = process_body_text_from_tex(sp, bibkey_map, refkey_map) # skip if no body text parsed if not body_text: with open(log_file, 'a+') as body_f: body_f.write(f'{file_id},warn_no_body\n') metadata = { "title": title, "authors": authors, "year": year_str, "venue": "", "identifiers": { "arxiv_id": file_id } } return Paper( paper_id=file_id, pdf_hash="", metadata=metadata, abstract=abstract, body_text=body_text, back_matter=[], bib_entries=bibkey_map, ref_entries=refkey_map ) def convert_latex_xml_to_s2orc_json(xml_fpath: str, log_dir: str, grobid_config: Optional[Dict]=None) -> Paper: """ :param xml_fpath: :param log_dir: :param grobid_config: :return: """ assert os.path.exists(xml_fpath) # get file id file_id = str(os.path.splitext(xml_fpath)[0]).split('/')[-1] # try to get year from file name year = file_id.split('.')[0][:2] if year.isdigit(): year = int(year) if year < 40: year += 2000 else: year += 1900 year = str(year) else: year = "" # log file log_file = os.path.join(log_dir, 'failed.log') with open(xml_fpath, 'r') as f: try: xml = f.read() soup = BeautifulSoup(xml, "lxml") paper = convert_xml_to_s2orc(soup, file_id, year, log_file, grobid_config=grobid_config) return paper except UnicodeDecodeError: with open(log_file, 'a+') as log_f: log_f.write(f'{file_id},err_unicode_decode\n') raise UnicodeDecodeError