memsum-arxiv-summarizer / services /normalization_utils.py
nianlonggu
init
02ae0bf
import re
import time
import numpy as np
from jsonschema import validate
from nltk.tokenize import sent_tokenize
import json
class DocumentNormalizer:
def __init__(self, json_schema_path ):
self.json_schema = json.load(open(json_schema_path,"r"))
self.cit_marker_matcher = re.compile("(^[^A-Za-z\d]*)([0-9]+)(?=[^A-Za-z\d]*$)")
self.sentence_boundary_matcher = re.compile("\.\s")
def normalize( self, paper, requires_validation = True ):
##### Author #####
parsed_authors = self.parse_author( paper )
##### Title #####
parsed_title = self.parse_title( paper )
##### Venue #####
parsed_venue = self.parse_venue( paper )
##### DOI #####
parsed_doi = self.parse_doi(paper)
##### URL #####
parsed_url = self.parse_url(paper)
##### PublicationDate #####
parsed_pub_date = self.parse_pub_date(paper)
##### Reference #####
parsed_reference, bib_entry_key_to_row_id_mapper = self.parse_reference(paper)
##### Content #####
parsed_content = self.parse_content(paper, bib_entry_key_to_row_id_mapper)
##### Abstract (The abstract text stored in the metadata) #####
abstract_text = (" ".join(self.get_sentence_list_from_parsed_sections( parsed_content["Abstract_Parsed"] ))).strip()
##### Last_update_unixtime ######
Last_update_unixtime = int(time.time())
##### Others #####
Abstract_in_metadata = abstract_text != ""
isDuplicated = False
normalized_paper = {
"Author":parsed_authors,
"Title":parsed_title,
"Abstract":abstract_text,
"Venue":parsed_venue,
"DOI":parsed_doi,
"URL":parsed_url,
"PublicationDate":parsed_pub_date,
"Content":parsed_content,
"Reference":parsed_reference,
"Last_update_unixtime":Last_update_unixtime,
"Abstract_in_metadata":Abstract_in_metadata,
"isDuplicated":isDuplicated
}
##### Additional IDs, this is only added for S2ORC dataset #####
additional_ids = self.parse_additional_ids(paper)
normalized_paper.update( additional_ids )
if requires_validation:
try:
validate(instance=normalized_paper, schema=self.json_schema)
except:
return None
return normalized_paper
def get_sentence_list_from_parsed_sections(self, parsed_sections ):
sentence_list = []
for section in parsed_sections:
sentence_list.append(str(section.get( "section_title", "" )))
for para in section.get("section_text",[]):
for sen in para.get("paragraph_text", []):
sentence_list.append( str(sen.get("sentence_text","")) )
return sentence_list
def parse_author(self, paper ):
try:
parsed_authors = []
authors = paper.get("authors", [] )
for author in authors:
parsed_authors.append(
{
"GivenName":str( author.get( "first", "" ).replace("None","") ),
"FamilyName":str( author.get( "last", "" ).replace("None","") )
}
)
except:
parsed_authors = []
return parsed_authors
def parse_title(self, paper ):
try:
parsed_title = str(paper.get("title", "")).replace("None","").lstrip("[").rstrip("]")
except:
parsed_title = ""
return parsed_title
def parse_venue(self, paper):
try:
parsed_venue = str(paper.get("venue", "")).replace("None","")
except:
parsed_venue = ""
if parsed_venue.strip() == "":
try:
parsed_venue = str(paper.get("journal","")).replace("None","")
except:
parsed_venue = ""
return parsed_venue
def parse_doi(self, paper):
try:
parsed_doi = str( paper.get("doi","") ).replace("None","")
except:
parsed_doi = ""
return parsed_doi
def parse_url(self, paper):
try:
parsed_doi = str(paper.get("doi","")).strip().replace("%", "%25").replace('"', "%22").replace("#", "%23").replace(" ", "%20").replace("?", "%3F").replace("None","")
if parsed_doi.strip() != "":
parsed_url = "https://doi.org/" + parsed_doi
else:
parsed_url = str(paper.get("s2_url", ""))
except:
parsed_url = ""
return parsed_url
def parse_pub_date( self, paper ):
try:
year = str(int(paper.get("year", ""))).replace("None","")
except:
year = ""
return {
"Year":year
}
def parse_para( self, para, bib_entry_key_to_row_id_mapper ):
paragraph_text = [{ "sentence_id":str(sen_id), "sentence_text": str(sen), "cite_spans":[] }
for sen_id, sen in enumerate(self.sent_tok( str(para.get("text",""))) )]
para_cite_spans = para.get( "cite_spans", [] )
for cite_span in para_cite_spans:
start, end = cite_span["start"], cite_span["end"]
for sen in paragraph_text:
if start < len( sen["sentence_text"] ):
end = min( end, len( sen["sentence_text"] ) )
sen["cite_spans"].append(
{
"start":start,
"end":end,
"text":sen["sentence_text"][start:end],
"ref_id":cite_span["ref_id"]
}
)
break
else:
start -= len( sen["sentence_text"] )
end -= len( sen["sentence_text"] )
cleaned_paragraph_text = []
for sen in paragraph_text:
sentence_text = sen["sentence_text"]
cite_spans = sen["cite_spans"]
sentence_text = sentence_text.rstrip()
cite_spans.sort( key= lambda x:x["start"] )
cleaned_cite_spans = []
for sen_cite_span in cite_spans:
if sen_cite_span["ref_id"] not in bib_entry_key_to_row_id_mapper:
continue
start, end = sen_cite_span["start"], sen_cite_span["end"]
## make sure ther is no overlapping between multiple citation markers
if len(cleaned_cite_spans) > 0 and start < int(cleaned_cite_spans[-1]["end"]):
continue
if start >= len(sentence_text):
continue
end = min( end, len(sentence_text) )
sen_cite_span["start"] = str(start)
sen_cite_span["end"] = str(end)
sen_cite_span["text"] = sentence_text[start:end]
sen_cite_span["ref_id"] = str(bib_entry_key_to_row_id_mapper[ sen_cite_span["ref_id"] ])
cleaned_cite_spans.append( sen_cite_span )
sentence_id = str(len(cleaned_paragraph_text))
cleaned_paragraph_text.append(
{
"sentence_id":sentence_id,
"sentence_text":sentence_text,
"cite_spans":cleaned_cite_spans
}
)
return cleaned_paragraph_text
def parse_para_list( self, para_list, bib_entry_key_to_row_id_mapper ):
section_list = []
current_section = None
for para in para_list:
paragraph_text = self.parse_para( para, bib_entry_key_to_row_id_mapper )
para_section = str(para.get("section",""))
if current_section is None or (para_section != "" and para_section != current_section["section_title"]):
if current_section is not None:
section_list.append(current_section)
current_section = {
"section_id":str(len(section_list)),
"section_title":para_section,
"section_text":[
{
"paragraph_id":"0",
"paragraph_text":paragraph_text
}
]
}
else:
next_para_id = len(current_section["section_text"])
current_section["section_text"].append(
{
"paragraph_id":str(next_para_id),
"paragraph_text":paragraph_text
}
)
if current_section is not None:
section_list.append(current_section)
if (" ".join(self.get_sentence_list_from_parsed_sections( section_list ))).strip() == "":
section_list = []
return section_list
def parse_content( self, paper, bib_entry_key_to_row_id_mapper ):
### Abstract
abstract = ""
### Abstract_Parsed
try:
pdf_parsed_abstract = paper.get("pdf_parse",{}).get("abstract",[])
if len( pdf_parsed_abstract ) == 0:
abstract_text = str(paper.get("abstract",""))
if abstract_text != "None" and abstract_text != "":
pdf_parsed_abstract = [ { "section":"Abstract", "text":abstract_text } ]
assert len(pdf_parsed_abstract) > 0
abstract_parsed = self.parse_para_list( pdf_parsed_abstract, bib_entry_key_to_row_id_mapper )
except:
abstract_parsed = []
### Fullbody
fullbody = ""
### Fullbody_Parsed
try:
fullbody_parsed = self.parse_para_list( paper.get( "pdf_parse", {} ).get("body_text", []), bib_entry_key_to_row_id_mapper )
except:
fullbody_parsed = []
return {
"Abstract":abstract,
"Abstract_Parsed":abstract_parsed,
"Fullbody":fullbody,
"Fullbody_Parsed":fullbody_parsed
}
def parse_reference(self, paper):
try:
bibref_text = {}
body_text = paper.get("pdf_parse",{}).get("body_text", [])
for para in body_text:
for cit in para.get("cite_spans", []):
if isinstance(cit, dict):
ref_id, ref_text = cit.get("ref_id",""), cit.get("text","")
if ref_id != "":
bibref_text[ref_id] = ref_text
for ref_id in bibref_text:
ref_text = bibref_text[ref_id]
matched_texts = self.cit_marker_matcher.findall(ref_text)
if len(matched_texts) > 0:
ref_text = matched_texts[0][1]+"."
else:
ref_text = ""
bibref_text[ref_id] = ref_text
except:
bibref_text = {}
try:
reference = []
bib_entry_key_to_row_id_mapper = {}
bib_entries = paper.get("pdf_parse",{}).get("bib_entries",{})
bib_entry_keys = list(bib_entries.keys())
try:
bib_entry_keys.sort( key = lambda x : int(x[6:]) )
except:
pass
for bib_entry_key in bib_entry_keys:
try:
parsed_entry = self.convert_bibentry_to_metadata( bib_entries[bib_entry_key] )
reference_text = self.get_citation_from_paper_metadata(parsed_entry)
if bibref_text.get(bib_entry_key,"").strip() != "":
reference_text = bibref_text[bib_entry_key] + " "+ reference_text
parsed_entry["ReferenceText"] = reference_text
bib_entry_key_to_row_id_mapper[bib_entry_key] = len(reference)
reference.append(parsed_entry)
except:
continue
except:
reference = []
bib_entry_key_to_row_id_mapper = {}
return reference, bib_entry_key_to_row_id_mapper
def parse_additional_ids(self, paper):
try:
S2CID = str(paper.get("paper_id", "")).replace("None","")
PMID = str(paper.get("pubmed_id", "")).replace("None","")
PMCID = str(paper.get("pmc_id", "")).replace("None","")
ArxivId = str(paper.get("arxiv_id", "")).replace("None","")
ACLId = str(paper.get("acl_id","")).replace("None","")
MAGId = str(paper.get("mag_id","")).replace("None","")
except:
S2CID = ""
PMID = ""
PMCID = ""
ArxivId = ""
ACLId = ""
MAGId = ""
return {
"S2CID":S2CID,
"PMID":PMID,
"PMCID":PMCID,
"ArxivId":ArxivId,
"ACLId":ACLId,
"MAGId":MAGId
}
def sent_tok(self, text, min_sen_len = 10 ):
sens = self.sentence_boundary_matcher.split( text )
for pos in range( len(sens)-1 ):
sens[pos] += ". "
return self.merge_sens( sens, min_sen_len = min_sen_len )
def merge_sens(self, sens, min_sen_len = 10 ):
out_sens =[]
current_sen = None
for sen in sens:
sen_len = len(sen.split())
if sen_len >= min_sen_len:
if current_sen is not None:
out_sens.append( current_sen )
current_sen = sen
else:
if current_sen is not None:
current_sen += sen
else:
current_sen = sen
if current_sen is not None:
if len( current_sen.split() ) < min_sen_len and len( out_sens ) > 0:
out_sens[-1] += current_sen
else:
out_sens.append(current_sen)
return out_sens
def convert_bibentry_to_metadata(self, bibentry):
metadata = {}
metadata["Title"] = bibentry["title"]
metadata["Author"] = []
for author in bibentry.get("authors",[]):
metadata["Author"].append({
"GivenName":author.get("first",""),
"FamilyName": author.get("last", "")
})
metadata["Venue"] = bibentry.get("venue","")
metadata["PublicationDate"] = {"Year":str( bibentry.get("year","") )}
return metadata
def get_citation_from_paper_metadata(self, paper_metadata ):
author = paper_metadata.get("Author",[])
title = paper_metadata.get("Title","")
venue = paper_metadata.get("Venue","")
year = paper_metadata.get("PublicationDate",{}).get("Year","")
author_list = []
for pos,author_item in enumerate(author):
if pos == 0:
author_list.append( "%s, %s"%( author_item.get("FamilyName",""), author_item.get("GivenName","") ) )
else:
author_list.append( "%s %s"%( author_item.get("GivenName",""), author_item.get("FamilyName","") ) )
if len(author_list)>3:
author_info = author_list[0] + " et al"
elif len(author_list)>1:
author_info = ", ".join( author_list[:-1] ) + ", and " + author_list[-1]
elif len(author_list)==1:
author_info = author_list[0]
else:
author_info = ""
author_info += "."
title_info = "β€œ"+title.rstrip(".")+".”"
journal_info = venue
if year.strip() != "":
year_info = "(%s)"%(year)
else:
year_info = ""
citation_text = " ".join(" ".join( [author_info, title_info, journal_info, year_info ] ).split()) +"."
return citation_text