Spaces:
Sleeping
Sleeping
import re | |
import time | |
import numpy as np | |
from jsonschema import validate | |
from nltk.tokenize import sent_tokenize | |
import json | |
class DocumentNormalizer: | |
def __init__(self, json_schema_path ): | |
self.json_schema = json.load(open(json_schema_path,"r")) | |
self.cit_marker_matcher = re.compile("(^[^A-Za-z\d]*)([0-9]+)(?=[^A-Za-z\d]*$)") | |
self.sentence_boundary_matcher = re.compile("\.\s") | |
def normalize( self, paper, requires_validation = True ): | |
##### Author ##### | |
parsed_authors = self.parse_author( paper ) | |
##### Title ##### | |
parsed_title = self.parse_title( paper ) | |
##### Venue ##### | |
parsed_venue = self.parse_venue( paper ) | |
##### DOI ##### | |
parsed_doi = self.parse_doi(paper) | |
##### URL ##### | |
parsed_url = self.parse_url(paper) | |
##### PublicationDate ##### | |
parsed_pub_date = self.parse_pub_date(paper) | |
##### Reference ##### | |
parsed_reference, bib_entry_key_to_row_id_mapper = self.parse_reference(paper) | |
##### Content ##### | |
parsed_content = self.parse_content(paper, bib_entry_key_to_row_id_mapper) | |
##### Abstract (The abstract text stored in the metadata) ##### | |
abstract_text = (" ".join(self.get_sentence_list_from_parsed_sections( parsed_content["Abstract_Parsed"] ))).strip() | |
##### Last_update_unixtime ###### | |
Last_update_unixtime = int(time.time()) | |
##### Others ##### | |
Abstract_in_metadata = abstract_text != "" | |
isDuplicated = False | |
normalized_paper = { | |
"Author":parsed_authors, | |
"Title":parsed_title, | |
"Abstract":abstract_text, | |
"Venue":parsed_venue, | |
"DOI":parsed_doi, | |
"URL":parsed_url, | |
"PublicationDate":parsed_pub_date, | |
"Content":parsed_content, | |
"Reference":parsed_reference, | |
"Last_update_unixtime":Last_update_unixtime, | |
"Abstract_in_metadata":Abstract_in_metadata, | |
"isDuplicated":isDuplicated | |
} | |
##### Additional IDs, this is only added for S2ORC dataset ##### | |
additional_ids = self.parse_additional_ids(paper) | |
normalized_paper.update( additional_ids ) | |
if requires_validation: | |
try: | |
validate(instance=normalized_paper, schema=self.json_schema) | |
except: | |
return None | |
return normalized_paper | |
def get_sentence_list_from_parsed_sections(self, parsed_sections ): | |
sentence_list = [] | |
for section in parsed_sections: | |
sentence_list.append(str(section.get( "section_title", "" ))) | |
for para in section.get("section_text",[]): | |
for sen in para.get("paragraph_text", []): | |
sentence_list.append( str(sen.get("sentence_text","")) ) | |
return sentence_list | |
def parse_author(self, paper ): | |
try: | |
parsed_authors = [] | |
authors = paper.get("authors", [] ) | |
for author in authors: | |
parsed_authors.append( | |
{ | |
"GivenName":str( author.get( "first", "" ).replace("None","") ), | |
"FamilyName":str( author.get( "last", "" ).replace("None","") ) | |
} | |
) | |
except: | |
parsed_authors = [] | |
return parsed_authors | |
def parse_title(self, paper ): | |
try: | |
parsed_title = str(paper.get("title", "")).replace("None","").lstrip("[").rstrip("]") | |
except: | |
parsed_title = "" | |
return parsed_title | |
def parse_venue(self, paper): | |
try: | |
parsed_venue = str(paper.get("venue", "")).replace("None","") | |
except: | |
parsed_venue = "" | |
if parsed_venue.strip() == "": | |
try: | |
parsed_venue = str(paper.get("journal","")).replace("None","") | |
except: | |
parsed_venue = "" | |
return parsed_venue | |
def parse_doi(self, paper): | |
try: | |
parsed_doi = str( paper.get("doi","") ).replace("None","") | |
except: | |
parsed_doi = "" | |
return parsed_doi | |
def parse_url(self, paper): | |
try: | |
parsed_doi = str(paper.get("doi","")).strip().replace("%", "%25").replace('"', "%22").replace("#", "%23").replace(" ", "%20").replace("?", "%3F").replace("None","") | |
if parsed_doi.strip() != "": | |
parsed_url = "https://doi.org/" + parsed_doi | |
else: | |
parsed_url = str(paper.get("s2_url", "")) | |
except: | |
parsed_url = "" | |
return parsed_url | |
def parse_pub_date( self, paper ): | |
try: | |
year = str(int(paper.get("year", ""))).replace("None","") | |
except: | |
year = "" | |
return { | |
"Year":year | |
} | |
def parse_para( self, para, bib_entry_key_to_row_id_mapper ): | |
paragraph_text = [{ "sentence_id":str(sen_id), "sentence_text": str(sen), "cite_spans":[] } | |
for sen_id, sen in enumerate(self.sent_tok( str(para.get("text",""))) )] | |
para_cite_spans = para.get( "cite_spans", [] ) | |
for cite_span in para_cite_spans: | |
start, end = cite_span["start"], cite_span["end"] | |
for sen in paragraph_text: | |
if start < len( sen["sentence_text"] ): | |
end = min( end, len( sen["sentence_text"] ) ) | |
sen["cite_spans"].append( | |
{ | |
"start":start, | |
"end":end, | |
"text":sen["sentence_text"][start:end], | |
"ref_id":cite_span["ref_id"] | |
} | |
) | |
break | |
else: | |
start -= len( sen["sentence_text"] ) | |
end -= len( sen["sentence_text"] ) | |
cleaned_paragraph_text = [] | |
for sen in paragraph_text: | |
sentence_text = sen["sentence_text"] | |
cite_spans = sen["cite_spans"] | |
sentence_text = sentence_text.rstrip() | |
cite_spans.sort( key= lambda x:x["start"] ) | |
cleaned_cite_spans = [] | |
for sen_cite_span in cite_spans: | |
if sen_cite_span["ref_id"] not in bib_entry_key_to_row_id_mapper: | |
continue | |
start, end = sen_cite_span["start"], sen_cite_span["end"] | |
## make sure ther is no overlapping between multiple citation markers | |
if len(cleaned_cite_spans) > 0 and start < int(cleaned_cite_spans[-1]["end"]): | |
continue | |
if start >= len(sentence_text): | |
continue | |
end = min( end, len(sentence_text) ) | |
sen_cite_span["start"] = str(start) | |
sen_cite_span["end"] = str(end) | |
sen_cite_span["text"] = sentence_text[start:end] | |
sen_cite_span["ref_id"] = str(bib_entry_key_to_row_id_mapper[ sen_cite_span["ref_id"] ]) | |
cleaned_cite_spans.append( sen_cite_span ) | |
sentence_id = str(len(cleaned_paragraph_text)) | |
cleaned_paragraph_text.append( | |
{ | |
"sentence_id":sentence_id, | |
"sentence_text":sentence_text, | |
"cite_spans":cleaned_cite_spans | |
} | |
) | |
return cleaned_paragraph_text | |
def parse_para_list( self, para_list, bib_entry_key_to_row_id_mapper ): | |
section_list = [] | |
current_section = None | |
for para in para_list: | |
paragraph_text = self.parse_para( para, bib_entry_key_to_row_id_mapper ) | |
para_section = str(para.get("section","")) | |
if current_section is None or (para_section != "" and para_section != current_section["section_title"]): | |
if current_section is not None: | |
section_list.append(current_section) | |
current_section = { | |
"section_id":str(len(section_list)), | |
"section_title":para_section, | |
"section_text":[ | |
{ | |
"paragraph_id":"0", | |
"paragraph_text":paragraph_text | |
} | |
] | |
} | |
else: | |
next_para_id = len(current_section["section_text"]) | |
current_section["section_text"].append( | |
{ | |
"paragraph_id":str(next_para_id), | |
"paragraph_text":paragraph_text | |
} | |
) | |
if current_section is not None: | |
section_list.append(current_section) | |
if (" ".join(self.get_sentence_list_from_parsed_sections( section_list ))).strip() == "": | |
section_list = [] | |
return section_list | |
def parse_content( self, paper, bib_entry_key_to_row_id_mapper ): | |
### Abstract | |
abstract = "" | |
### Abstract_Parsed | |
try: | |
pdf_parsed_abstract = paper.get("pdf_parse",{}).get("abstract",[]) | |
if len( pdf_parsed_abstract ) == 0: | |
abstract_text = str(paper.get("abstract","")) | |
if abstract_text != "None" and abstract_text != "": | |
pdf_parsed_abstract = [ { "section":"Abstract", "text":abstract_text } ] | |
assert len(pdf_parsed_abstract) > 0 | |
abstract_parsed = self.parse_para_list( pdf_parsed_abstract, bib_entry_key_to_row_id_mapper ) | |
except: | |
abstract_parsed = [] | |
### Fullbody | |
fullbody = "" | |
### Fullbody_Parsed | |
try: | |
fullbody_parsed = self.parse_para_list( paper.get( "pdf_parse", {} ).get("body_text", []), bib_entry_key_to_row_id_mapper ) | |
except: | |
fullbody_parsed = [] | |
return { | |
"Abstract":abstract, | |
"Abstract_Parsed":abstract_parsed, | |
"Fullbody":fullbody, | |
"Fullbody_Parsed":fullbody_parsed | |
} | |
def parse_reference(self, paper): | |
try: | |
bibref_text = {} | |
body_text = paper.get("pdf_parse",{}).get("body_text", []) | |
for para in body_text: | |
for cit in para.get("cite_spans", []): | |
if isinstance(cit, dict): | |
ref_id, ref_text = cit.get("ref_id",""), cit.get("text","") | |
if ref_id != "": | |
bibref_text[ref_id] = ref_text | |
for ref_id in bibref_text: | |
ref_text = bibref_text[ref_id] | |
matched_texts = self.cit_marker_matcher.findall(ref_text) | |
if len(matched_texts) > 0: | |
ref_text = matched_texts[0][1]+"." | |
else: | |
ref_text = "" | |
bibref_text[ref_id] = ref_text | |
except: | |
bibref_text = {} | |
try: | |
reference = [] | |
bib_entry_key_to_row_id_mapper = {} | |
bib_entries = paper.get("pdf_parse",{}).get("bib_entries",{}) | |
bib_entry_keys = list(bib_entries.keys()) | |
try: | |
bib_entry_keys.sort( key = lambda x : int(x[6:]) ) | |
except: | |
pass | |
for bib_entry_key in bib_entry_keys: | |
try: | |
parsed_entry = self.convert_bibentry_to_metadata( bib_entries[bib_entry_key] ) | |
reference_text = self.get_citation_from_paper_metadata(parsed_entry) | |
if bibref_text.get(bib_entry_key,"").strip() != "": | |
reference_text = bibref_text[bib_entry_key] + " "+ reference_text | |
parsed_entry["ReferenceText"] = reference_text | |
bib_entry_key_to_row_id_mapper[bib_entry_key] = len(reference) | |
reference.append(parsed_entry) | |
except: | |
continue | |
except: | |
reference = [] | |
bib_entry_key_to_row_id_mapper = {} | |
return reference, bib_entry_key_to_row_id_mapper | |
def parse_additional_ids(self, paper): | |
try: | |
S2CID = str(paper.get("paper_id", "")).replace("None","") | |
PMID = str(paper.get("pubmed_id", "")).replace("None","") | |
PMCID = str(paper.get("pmc_id", "")).replace("None","") | |
ArxivId = str(paper.get("arxiv_id", "")).replace("None","") | |
ACLId = str(paper.get("acl_id","")).replace("None","") | |
MAGId = str(paper.get("mag_id","")).replace("None","") | |
except: | |
S2CID = "" | |
PMID = "" | |
PMCID = "" | |
ArxivId = "" | |
ACLId = "" | |
MAGId = "" | |
return { | |
"S2CID":S2CID, | |
"PMID":PMID, | |
"PMCID":PMCID, | |
"ArxivId":ArxivId, | |
"ACLId":ACLId, | |
"MAGId":MAGId | |
} | |
def sent_tok(self, text, min_sen_len = 10 ): | |
sens = self.sentence_boundary_matcher.split( text ) | |
for pos in range( len(sens)-1 ): | |
sens[pos] += ". " | |
return self.merge_sens( sens, min_sen_len = min_sen_len ) | |
def merge_sens(self, sens, min_sen_len = 10 ): | |
out_sens =[] | |
current_sen = None | |
for sen in sens: | |
sen_len = len(sen.split()) | |
if sen_len >= min_sen_len: | |
if current_sen is not None: | |
out_sens.append( current_sen ) | |
current_sen = sen | |
else: | |
if current_sen is not None: | |
current_sen += sen | |
else: | |
current_sen = sen | |
if current_sen is not None: | |
if len( current_sen.split() ) < min_sen_len and len( out_sens ) > 0: | |
out_sens[-1] += current_sen | |
else: | |
out_sens.append(current_sen) | |
return out_sens | |
def convert_bibentry_to_metadata(self, bibentry): | |
metadata = {} | |
metadata["Title"] = bibentry["title"] | |
metadata["Author"] = [] | |
for author in bibentry.get("authors",[]): | |
metadata["Author"].append({ | |
"GivenName":author.get("first",""), | |
"FamilyName": author.get("last", "") | |
}) | |
metadata["Venue"] = bibentry.get("venue","") | |
metadata["PublicationDate"] = {"Year":str( bibentry.get("year","") )} | |
return metadata | |
def get_citation_from_paper_metadata(self, paper_metadata ): | |
author = paper_metadata.get("Author",[]) | |
title = paper_metadata.get("Title","") | |
venue = paper_metadata.get("Venue","") | |
year = paper_metadata.get("PublicationDate",{}).get("Year","") | |
author_list = [] | |
for pos,author_item in enumerate(author): | |
if pos == 0: | |
author_list.append( "%s, %s"%( author_item.get("FamilyName",""), author_item.get("GivenName","") ) ) | |
else: | |
author_list.append( "%s %s"%( author_item.get("GivenName",""), author_item.get("FamilyName","") ) ) | |
if len(author_list)>3: | |
author_info = author_list[0] + " et al" | |
elif len(author_list)>1: | |
author_info = ", ".join( author_list[:-1] ) + ", and " + author_list[-1] | |
elif len(author_list)==1: | |
author_info = author_list[0] | |
else: | |
author_info = "" | |
author_info += "." | |
title_info = "β"+title.rstrip(".")+".β" | |
journal_info = venue | |
if year.strip() != "": | |
year_info = "(%s)"%(year) | |
else: | |
year_info = "" | |
citation_text = " ".join(" ".join( [author_info, title_info, journal_info, year_info ] ).split()) +"." | |
return citation_text | |