Spaces:
Build error
Build error
from arxiv_public_data.fulltext import convert_directory_parallel | |
from arxiv_public_data import internal_citations | |
import torch | |
import os | |
from summarizer import Summarizer | |
from sentence_transformers import SentenceTransformer | |
import spacy | |
import numpy as np | |
from keybert import KeyBERT | |
import shutil, joblib | |
from distutils.dir_util import copy_tree | |
try: | |
from transformers import * | |
except: | |
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoConfig, AutoModel, LEDTokenizer, \ | |
LEDForConditionalGeneration | |
from src.defaults import DEFAULTS_CPU_COMPAT, DEFAULTS_HIGH_GPU | |
class Surveyor: | |
''' | |
A class to abstract all nlp and data mining helper functions as well as workflows | |
required to generate the survey from a single query, with absolute configurability | |
''' | |
def __init__( | |
self, | |
pdf_dir=None, | |
txt_dir=None, | |
img_dir=None, | |
tab_dir=None, | |
dump_dir=None, | |
models_dir=None, | |
title_model_name=None, | |
ex_summ_model_name=None, | |
ledmodel_name=None, | |
embedder_name=None, | |
nlp_name=None, | |
similarity_nlp_name=None, | |
kw_model_name=None, | |
high_gpu=False, | |
refresh_models=False, | |
no_save_models=False, | |
print_fn=None, | |
survey_print_fn=None | |
): | |
''' | |
Initializes models and directory structure for the surveyor | |
Optional Params: | |
- pdf_dir: String, pdf paper storage directory - defaults to arxiv_data/tarpdfs/ | |
- txt_dir: String, text-converted paper storage directory - defaults to arxiv_data/fulltext/ | |
- img_dir: String, image image storage directory - defaults to arxiv_data/images/ | |
- tab_dir: String, tables storage directory - defaults to arxiv_data/tables/ | |
- dump_dir: String, all_output_dir - defaults to arxiv_dumps/ | |
- models_dir: String, directory to save to huge models | |
- title_model_name: String, title model name/tag in hugging-face, defaults to `Callidior/bert2bert-base-arxiv-titlegen` | |
- ex_summ_model_name: String, extractive summary model name/tag in hugging-face, defaults to `allenai/scibert_scivocab_uncased` | |
- ledmodel_name: String, led model(for abstractive summary) name/tag in hugging-face, defaults to `allenai/led-large-16384-arxiv` | |
- embedder_name: String, sentence embedder name/tag in hugging-face, defaults to `paraphrase-MiniLM-L6-v2` | |
- nlp_name: String, spacy model name/tag in hugging-face (if changed - needs to be spacy-installed prior), defaults to `en_core_sci_scibert` | |
- similarity_nlp_name: String, spacy downstream trained model(for similarity) name/tag in hugging-face (if changed - needs to be spacy-installed prior), defaults to `en_core_sci_lg` | |
- kw_model_name: String, keyword extraction model name/tag in hugging-face, defaults to `distilbert-base-nli-mean-tokens` | |
- high_gpu: Bool, High GPU usage permitted, defaults to False | |
- refresh_models: Bool, Refresh model downloads with given names (needs atleast one model name param above), defaults to False | |
- no_save_models: forces refresh models | |
- max_search: int maximium number of papers to gaze at - defaults to 100 | |
- num_papers: int maximium number of papers to download and analyse - defaults to 25 | |
''' | |
self.print_fn = print | |
if print_fn is not None: | |
self.print_fn = print_fn | |
self.survey_print_fn = self.print_fn | |
if survey_print_fn is not None: | |
self.survey_print_fn = survey_print_fn | |
self.torch_device = 'cpu' | |
self.print_fn("\nTorch_device: " + self.torch_device) | |
if torch.cuda.is_available(): | |
self.torch_device = 'cuda' | |
spacy.require_gpu() | |
self.high_gpu = high_gpu | |
#self.DEFAULTS = DEFAULTS_CPU_COMPAT | |
#if self.high_gpu: | |
self.DEFAULTS = DEFAULTS_HIGH_GPU | |
if not kw_model_name: | |
kw_model_name = self.DEFAULTS["kw_model_name"] | |
self.num_papers = self.DEFAULTS['num_papers'] | |
self.max_search = self.DEFAULTS['max_search'] | |
if not models_dir: | |
models_dir = self.DEFAULTS['models_dir'] | |
models_found = False | |
if os.path.exists(models_dir) and not no_save_models: | |
if len(os.listdir(models_dir)) > 6: | |
models_found = True | |
if not title_model_name: | |
title_model_name = self.DEFAULTS["title_model_name"] | |
if not ex_summ_model_name: | |
ex_summ_model_name = self.DEFAULTS["ex_summ_model_name"] | |
if not ledmodel_name: | |
ledmodel_name = self.DEFAULTS["ledmodel_name"] | |
if not embedder_name: | |
embedder_name = self.DEFAULTS["embedder_name"] | |
if not nlp_name: | |
nlp_name = self.DEFAULTS["nlp_name"] | |
if not similarity_nlp_name: | |
similarity_nlp_name = self.DEFAULTS["similarity_nlp_name"] | |
if refresh_models or not models_found: | |
self.print_fn(f'\nInitializing models {"and saving (about 5GB)" if not no_save_models else ""}') | |
if not no_save_models: | |
self.clean_dirs([models_dir]) | |
self.title_tokenizer = AutoTokenizer.from_pretrained(title_model_name, trust_remote_code=True) | |
self.title_model = AutoModelForSeq2SeqLM.from_pretrained(title_model_name, trust_remote_code=True).to(self.torch_device) | |
self.title_model.eval() | |
if not no_save_models: | |
self.title_model.save_pretrained(models_dir + "/title_model") | |
#self.title_tokenizer.save_pretrained(models_dir + "/title_tokenizer") | |
# summary model | |
self.custom_config = AutoConfig.from_pretrained(ex_summ_model_name) | |
self.custom_config.output_hidden_states = True | |
self.summ_tokenizer = AutoTokenizer.from_pretrained(ex_summ_model_name) | |
self.summ_model = AutoModel.from_pretrained(ex_summ_model_name, config=self.custom_config).to( | |
self.torch_device) | |
self.summ_model.eval() | |
if not no_save_models: | |
self.summ_model.save_pretrained(models_dir + "/summ_model") | |
#self.summ_tokenizer.save_pretrained(models_dir + "/summ_tokenizer") | |
self.model = Summarizer(custom_model=self.summ_model, custom_tokenizer=self.summ_tokenizer) | |
if 'led' in ledmodel_name: | |
self.ledtokenizer = LEDTokenizer.from_pretrained(ledmodel_name) | |
self.ledmodel = LEDForConditionalGeneration.from_pretrained(ledmodel_name).to(self.torch_device) | |
elif 't5' in ledmodel_name: | |
self.ledtokenizer = AutoTokenizer.from_pretrained(ledmodel_name) | |
self.ledmodel = T5ForConditionalGeneration.from_pretrained(ledmodel_name).to(self.torch_device) | |
elif 'bart' in ledmodel_name: | |
self.ledtokenizer = AutoTokenizer.from_pretrained(ledmodel_name) | |
self.ledmodel = BartForConditionalGeneration.from_pretrained(ledmodel_name).to(self.torch_device) | |
self.ledmodel.eval() | |
if not no_save_models: | |
self.ledmodel.save_pretrained(models_dir + "/ledmodel") | |
#self.ledtokenizer.save_pretrained(models_dir + "/ledtokenizer") | |
self.embedder = SentenceTransformer(embedder_name) | |
self.embedder.eval() | |
if not no_save_models: | |
self.embedder.save(models_dir + "/embedder") | |
else: | |
self.print_fn("\n-Initializing from previously saved models at" + models_dir) | |
self.title_tokenizer = AutoTokenizer.from_pretrained(title_model_name) | |
self.title_model = AutoModelForSeq2SeqLM.from_pretrained(models_dir + "/title_model").to(self.torch_device) | |
self.title_model.eval() | |
# summary model | |
#self.summ_config = AutoConfig.from_pretrained(ex_summ_model_name) | |
#self.summ_config.output_hidden_states = True | |
self.summ_tokenizer = AutoTokenizer.from_pretrained(ex_summ_model_name) | |
self.summ_model = AutoModel.from_pretrained(models_dir + "/summ_model").to( | |
self.torch_device) | |
self.summ_model.eval() | |
self.model = Summarizer(custom_model=self.summ_model, custom_tokenizer=self.summ_tokenizer) | |
if 'led' in ledmodel_name: | |
self.ledtokenizer = LEDTokenizer.from_pretrained(ledmodel_name) | |
self.ledmodel = LEDForConditionalGeneration.from_pretrained(models_dir + "/ledmodel").to(self.torch_device) | |
elif 't5' in ledmodel_name: | |
self.ledtokenizer = AutoTokenizer.from_pretrained(ledmodel_name) | |
self.ledmodel = T5ForConditionalGeneration.from_pretrained(models_dir + "/ledmodel").to(self.torch_device) | |
elif 'bart' in ledmodel_name: | |
self.ledtokenizer = AutoTokenizer.from_pretrained(ledmodel_name) | |
self.ledmodel = BartForConditionalGeneration.from_pretrained(models_dir + "/ledmodel").to(self.torch_device) | |
self.ledmodel.eval() | |
self.embedder = SentenceTransformer(models_dir + "/embedder") | |
self.embedder.eval() | |
self.nlp = spacy.load(nlp_name) | |
self.similarity_nlp = spacy.load(similarity_nlp_name) | |
self.kw_model = KeyBERT(kw_model_name) | |
self.define_structure(pdf_dir=pdf_dir, txt_dir=txt_dir, img_dir=img_dir, tab_dir=tab_dir, dump_dir=dump_dir) | |
def define_structure(self, pdf_dir=None, txt_dir=None, img_dir=None, tab_dir=None, dump_dir=None): | |
if pdf_dir: | |
self.pdf_dir = pdf_dir | |
else: | |
self.pdf_dir = self.DEFAULTS["pdf_dir"] | |
if txt_dir: | |
self.txt_dir = txt_dir | |
else: | |
self.txt_dir = self.DEFAULTS["txt_dir"] | |
if img_dir: | |
self.img_dir = img_dir | |
else: | |
self.img_dir = self.DEFAULTS["img_dir"] | |
if tab_dir: | |
self.tab_dir = tab_dir | |
else: | |
self.tab_dir = self.DEFAULTS["tab_dir"] | |
if dump_dir: | |
self.dump_dir = dump_dir | |
else: | |
self.dump_dir = self.DEFAULTS["dump_dir"] | |
dirs = [self.pdf_dir, self.txt_dir, self.img_dir, self.tab_dir, self.dump_dir] | |
if sum([True for dir in dirs if 'arxiv_data/' in dir]): | |
base = os.path.dirname("arxiv_data/") | |
if not os.path.exists(base): | |
os.mkdir(base) | |
self.clean_dirs(dirs) | |
def clean_dirs(self, dirs): | |
import shutil | |
for d in dirs: | |
if os.path.exists(d): | |
shutil.rmtree(d) | |
os.mkdir(d) | |
def pdf_route(self, pdf_dir, txt_dir, img_dir, tab_dir, dump_dir, papers_meta): | |
## Data prep | |
import joblib | |
# test full again - check images - check dfs !! | |
self.clean_dirs([pdf_dir, txt_dir, img_dir, tab_dir, dump_dir]) | |
papers = papers_meta[:self.num_papers] | |
selected_papers = papers | |
self.print_fn("\n-First stage paper collection...") | |
ids_none, papers, cites = self.fetch_papers(dump_dir, img_dir, papers, pdf_dir, tab_dir, txt_dir) | |
self.print_fn("\n-First stage paper collection complete, papers collected: \n" + ', '.join([p['id'] for p in papers])) | |
new_papers = papers_meta[self.num_papers : self.num_papers + len(ids_none)] | |
_ = self.get_freq_cited(cites) | |
''' | |
filtered_idlist = [] | |
for c in self.get_freq_cited(cites): | |
if c in | |
_, new_searched_papers = self.search(filtered_idlist) | |
new_papers.extend(new_searched_papers) | |
''' | |
selected_papers.extend(new_papers) | |
self.print_fn("\n-Second stage paper collection...") | |
_, new_papers, _ = self.fetch_papers(dump_dir, img_dir, new_papers, pdf_dir, tab_dir, txt_dir, repeat=True) | |
self.print_fn("\n-Second stage paper collection complete, new papers collected: \n" + ', '.join([p['id'] for p in new_papers])) | |
papers.extend(new_papers) | |
joblib.dump(papers, dump_dir + 'papers_extracted_pdf_route.dmp') | |
copy_tree(img_dir, dump_dir + os.path.basename(img_dir)) | |
copy_tree(tab_dir, dump_dir + os.path.basename(tab_dir)) | |
self.print_fn("\n-Extracting section-wise highlights.. ") | |
papers = self.extract_highlights(papers) | |
return papers, selected_papers | |
def get_freq_cited(self, cites_dict, k=5): | |
cites_list = [] | |
for k, v in cites_dict.items(): | |
cites_list.append(k) | |
[cites_list.append(val) for val in v] | |
cite_freqs = {cite: cites_list.count(cite) for cite in set(cites_list)} | |
sorted_cites = dict(sorted(cite_freqs.items(), key=lambda item: item[1], reverse=True)[:5]) | |
self.print_fn("\n-The most cited paper ids are:\n" + str(sorted_cites)) | |
return sorted_cites.keys() | |
def fetch_papers(self, dump_dir, img_dir, papers, pdf_dir, tab_dir, txt_dir, repeat=False): | |
import tempfile | |
if repeat: | |
with tempfile.TemporaryDirectory() as dirpath: | |
self.print_fn("\n- downloading extra pdfs.. ") | |
# full text preparation of selected papers | |
self.download_pdfs(papers, dirpath) | |
dirpath_pdfs = os.listdir(dirpath) | |
for file_name in dirpath_pdfs: | |
full_file_name = os.path.join(dirpath, file_name) | |
if os.path.isfile(full_file_name): | |
shutil.copy(full_file_name, pdf_dir) | |
self.print_fn("\n- converting extra pdfs.. ") | |
self.convert_pdfs(dirpath, txt_dir) | |
else: | |
self.print_fn("\n- downloading pdfs.. ") | |
# full text preparation of selected papers | |
self.download_pdfs(papers, pdf_dir) | |
self.print_fn("\n- converting pdfs.. ") | |
self.convert_pdfs(pdf_dir, txt_dir) | |
# plugging citations to our papers object | |
self.print_fn("\n- plugging in citation network.. ") | |
papers, cites = self.cocitation_network(papers, txt_dir) | |
joblib.dump(papers, dump_dir + 'papers_selected_pdf_route.dmp') | |
from distutils.dir_util import copy_tree | |
copy_tree(txt_dir, dump_dir + os.path.basename(txt_dir)) | |
copy_tree(pdf_dir, dump_dir + os.path.basename(pdf_dir)) | |
self.print_fn("\n- extracting structure.. ") | |
papers, ids_none = self.extract_structure(papers, pdf_dir, txt_dir, img_dir, dump_dir, tab_dir) | |
return ids_none, papers, cites | |
def tar_route(self, pdf_dir, txt_dir, img_dir, tab_dir, papers): | |
## Data prep | |
import joblib | |
# test full again - check images - check dfs !! | |
self.clean_dirs([pdf_dir, txt_dir, img_dir, tab_dir]) | |
# full text preparation of selected papers | |
self.download_sources(papers, pdf_dir) | |
self.convert_pdfs(pdf_dir, txt_dir) | |
# plugging citations to our papers object | |
papers, cites = self.cocitation_network(papers, txt_dir) | |
joblib.dump(papers, 'papers_selected_tar_route.dmp') | |
papers = self.extract_structure(papers, pdf_dir, txt_dir, img_dir, tab_dir) | |
joblib.dump(papers, 'papers_extracted_tar_route.dmp') | |
return papers | |
def build_doc(self, research_sections, papers, query=None, filename='survey.txt'): | |
import arxiv2bib | |
self.print_fn("\n-building bibliography entries.. ") | |
bibentries = arxiv2bib.arxiv2bib([p['id'] for p in papers]) | |
bibentries = [r.bibtex() for r in bibentries] | |
self.print_fn("\n-building final survey file .. at "+ filename) | |
file = open(filename, 'w+') | |
if query is None: | |
query = 'Internal(existing) research' | |
file.write("----------------------------------------------------------------------") | |
file.write("Title: A survey on " + query) | |
self.survey_print_fn("") | |
self.survey_print_fn("----------------------------------------------------------------------") | |
self.survey_print_fn("Title: A survey on " + query) | |
file.write("Author: Auto-Research (github.com/sidphbot/Auto-Research)") | |
self.survey_print_fn("Author: Auto-Research (github.com/sidphbot/Auto-Research)") | |
file.write("Dev: Auto-Research (github.com/sidphbot/Auto-Research)") | |
self.survey_print_fn("Dev: Auto-Research (github.com/sidphbot/Auto-Research)") | |
file.write("Disclaimer: This survey is intended to be a research starter. This Survey is Machine-Summarized, "+ | |
"\nhence some sentences might be wrangled or grammatically incorrect. However all sentences are "+ | |
"\nmined with proper citations. As All of the text is practically quoted texted, hence to "+ | |
"\nimprove visibility, all the papers are duly cited in the Bibiliography section. as bibtex "+ | |
"\nentries(only to avoid LaTex overhead). ") | |
self.survey_print_fn("Disclaimer: This survey is intended to be a research starter. This Survey is Machine-Summarized, "+ | |
"\nhence some sentences might be wrangled or grammatically incorrect. However all sentences are "+ | |
"\nmined with proper citations. As All of the text is practically quoted texted, hence to "+ | |
"\nimprove visibility, all the papers are duly cited in the Bibiliography section. as bibtex "+ | |
"\nentries(only to avoid LaTex overhead). ") | |
file.write("----------------------------------------------------------------------") | |
self.survey_print_fn("----------------------------------------------------------------------") | |
file.write("") | |
self.survey_print_fn("") | |
file.write('ABSTRACT') | |
self.survey_print_fn('ABSTRACT') | |
self.survey_print_fn("=================================================") | |
file.write("=================================================") | |
file.write("") | |
self.survey_print_fn("") | |
file.write(research_sections['abstract']) | |
self.survey_print_fn(research_sections['abstract']) | |
file.write("") | |
self.survey_print_fn("") | |
file.write('INTRODUCTION') | |
self.survey_print_fn('INTRODUCTION') | |
self.survey_print_fn("=================================================") | |
file.write("=================================================") | |
file.write("") | |
self.survey_print_fn("") | |
file.write(research_sections['introduction']) | |
self.survey_print_fn(research_sections['introduction']) | |
file.write("") | |
self.survey_print_fn("") | |
for k, v in research_sections.items(): | |
if k not in ['abstract', 'introduction', 'conclusion']: | |
file.write(k.upper()) | |
self.survey_print_fn(k.upper()) | |
self.survey_print_fn("=================================================") | |
file.write("=================================================") | |
file.write("") | |
self.survey_print_fn("") | |
file.write(v) | |
self.survey_print_fn(v) | |
file.write("") | |
self.survey_print_fn("") | |
file.write('CONCLUSION') | |
self.survey_print_fn('CONCLUSION') | |
self.survey_print_fn("=================================================") | |
file.write("=================================================") | |
file.write("") | |
self.survey_print_fn("") | |
file.write(research_sections['conclusion']) | |
self.survey_print_fn(research_sections['conclusion']) | |
file.write("") | |
self.survey_print_fn("") | |
file.write('REFERENCES') | |
self.survey_print_fn('REFERENCES') | |
self.survey_print_fn("=================================================") | |
file.write("=================================================") | |
file.write("") | |
self.survey_print_fn("") | |
for entry in bibentries: | |
file.write(entry) | |
self.survey_print_fn(entry) | |
file.write("") | |
self.survey_print_fn("") | |
self.survey_print_fn("========================XXX=========================") | |
file.write("========================XXX=========================") | |
file.close() | |
def build_basic_blocks(self, corpus_known_sections, corpus): | |
research_blocks = {} | |
for head, textarr in corpus_known_sections.items(): | |
if 'cuda' in self.torch_device: | |
torch.cuda.empty_cache() | |
# self.print_fn(head.upper()) | |
with torch.no_grad(): | |
summtext = self.model(" ".join([l.lower() for l in textarr]), ratio=0.5) | |
res = self.nlp(summtext) | |
res = set([str(sent) for sent in list(res.sents)]) | |
summtext = ''.join([line for line in res]) | |
# pself.print_fn(summtext) | |
research_blocks[head] = summtext | |
return research_blocks | |
def abstractive_summary(self, longtext): | |
''' | |
faulty method | |
input_ids = ledtokenizer(longtext, return_tensors="pt").input_ids | |
global_attention_mask = torch.zeros_like(input_ids) | |
# set global_attention_mask on first token | |
global_attention_mask[:, 0] = 1 | |
sequences = ledmodel.generate(input_ids, global_attention_mask=global_attention_mask).sequences | |
summary = ledtokenizer.batch_decode(sequences) | |
''' | |
if 'cuda' in self.torch_device: | |
torch.cuda.empty_cache() | |
inputs = self.ledtokenizer.prepare_seq2seq_batch(longtext, truncation=True, padding='longest', | |
return_tensors='pt').to(self.torch_device) | |
with torch.no_grad(): | |
summary_ids = self.ledmodel.generate(**inputs) | |
summary = self.ledtokenizer.batch_decode(summary_ids, skip_special_tokens=True, | |
clean_up_tokenization_spaces=True) | |
res = self.nlp(summary[0]) | |
res = set([str(sent) for sent in list(res.sents)]) | |
summtext = ''.join([line for line in res]) | |
#self.print_fn("abstractive summary type:" + str(type(summary))) | |
return summtext | |
def get_abstract(self, abs_lines, corpus_known_sections, research_blocks): | |
# abs_lines = " ".join(abs_lines) | |
abs_lines = "" | |
abs_lines += " ".join([l.lower() for l in corpus_known_sections['abstract']]) | |
abs_lines += research_blocks['abstract'] | |
# self.print_fn(abs_lines) | |
try: | |
return self.abstractive_summary(abs_lines) | |
except: | |
highlights = self.extractive_summary(abs_lines) | |
return self.abstractive_summary(highlights) | |
def get_corpus_lines(self, corpus): | |
abs_lines = [] | |
types = set() | |
for k, v in corpus.items(): | |
# self.print_fn(v) | |
types.add(type(v)) | |
abstext = k + '. ' + v.replace('\n', ' ') | |
abstext = self.nlp(abstext) | |
abs_lines.extend([str(sent).lower() for sent in list(abstext.sents)]) | |
#self.print_fn("unique corpus value types:" + str(types)) | |
# abs_lines = '\n'.join([str(sent) for sent in abs_lines.sents]) | |
return abs_lines | |
def get_sectioned_docs(self, papers, papers_meta): | |
import random | |
docs = [] | |
for p in papers: | |
for section in p['sections']: | |
if len(section['highlights']) > 0: | |
if self.high_gpu: | |
content = self.generate_title(section['highlights']) | |
else: | |
content = self.extractive_summary(''.join(section['highlights'])) | |
docs.append(content) | |
selected_pids = [p['id'] for p in papers] | |
meta_abs = [] | |
for p in papers_meta: | |
if p['id'] not in selected_pids: | |
meta_abs.append(self.generate_title(p['abstract'])) | |
docs.extend(meta_abs) | |
#self.print_fn("meta_abs num"+str(len(meta_abs))) | |
#self.print_fn("selected_pids num"+str(len(selected_pids))) | |
#self.print_fn("papers_meta num"+str(len(papers_meta))) | |
#assert (len(meta_abs) + len(selected_pids) == len(papers_meta)) | |
assert ('str' in str(type(random.sample(docs, 1)[0]))) | |
return [doc for doc in docs if doc != ''] | |
def cluster_lines(self, abs_lines): | |
from sklearn.cluster import KMeans | |
# from bertopic import BERTopic | |
# topic_model = BERTopic(embedding_model=embedder) | |
if 'cuda' in self.torch_device: | |
torch.cuda.empty_cache() | |
corpus_embeddings = self.embedder.encode(abs_lines) | |
# Normalize the embeddings to unit length | |
corpus_embeddings = corpus_embeddings / np.linalg.norm(corpus_embeddings, axis=1, keepdims=True) | |
with torch.no_grad(): | |
optimal_k = self.model.calculate_optimal_k(' '.join(abs_lines), k_max=10) | |
# Perform kmean clustering | |
clustering_model = KMeans(n_clusters=optimal_k, n_init=20) | |
# clustering_model = AgglomerativeClustering(n_clusters=optimal_k, affinity='cosine', linkage='average') #, affinity='cosine', linkage='average', distance_threshold=0.4) | |
clustering_model.fit(corpus_embeddings) | |
cluster_assignment = clustering_model.labels_ | |
clustered_sentences = {} | |
dummy_count = 0 | |
for sentence_id, cluster_id in enumerate(cluster_assignment): | |
if cluster_id not in clustered_sentences: | |
clustered_sentences[cluster_id] = [] | |
''' | |
if dummy_count < 5: | |
self.print_fn("abs_line: "+abs_lines[sentence_id]) | |
self.print_fn("cluster_ID: "+str(cluster_id)) | |
self.print_fn("embedding: "+str(corpus_embeddings[sentence_id])) | |
dummy_count += 1 | |
''' | |
clustered_sentences[cluster_id].append(abs_lines[sentence_id]) | |
# for i, cluster in clustered_sentences.items(): | |
# self.print_fn("Cluster ", i+1) | |
# self.print_fn(cluster) | |
# self.print_fn("") | |
return self.get_clustered_sections(clustered_sentences), clustered_sentences | |
def get_clusters(self, papers, papers_meta): | |
from sklearn.cluster import KMeans | |
# from bertopic import BERTopic | |
# topic_model = BERTopic(embedding_model=embedder) | |
if 'cuda' in self.torch_device: | |
torch.cuda.empty_cache() | |
abs_lines = self.get_sectioned_docs(papers, papers_meta) | |
corpus_embeddings = self.embedder.encode(abs_lines) | |
# Normalize the embeddings to unit length | |
corpus_embeddings = corpus_embeddings / np.linalg.norm(corpus_embeddings, axis=1, keepdims=True) | |
with torch.no_grad(): | |
optimal_k = self.model.calculate_optimal_k(' '.join(abs_lines), k_max=10) | |
# Perform kmean clustering | |
clustering_model = KMeans(n_clusters=optimal_k, n_init=20) | |
# clustering_model = AgglomerativeClustering(n_clusters=optimal_k, affinity='cosine', linkage='average') #, affinity='cosine', linkage='average', distance_threshold=0.4) | |
clustering_model.fit(corpus_embeddings) | |
cluster_assignment = clustering_model.labels_ | |
clustered_sentences = {} | |
dummy_count = 0 | |
for sentence_id, cluster_id in enumerate(cluster_assignment): | |
if cluster_id not in clustered_sentences: | |
clustered_sentences[cluster_id] = [] | |
''' | |
if dummy_count < 5: | |
self.print_fn("abs_line: "+abs_lines[sentence_id]) | |
self.print_fn("cluster_ID: "+str(cluster_id)) | |
self.print_fn("embedding: "+str(corpus_embeddings[sentence_id])) | |
dummy_count += 1 | |
''' | |
clustered_sentences[cluster_id].append(abs_lines[sentence_id]) | |
# for i, cluster in clustered_sentences.items(): | |
# self.print_fn("Cluster ", i+1) | |
# self.print_fn(cluster) | |
# self.print_fn("") | |
return self.get_clustered_sections(clustered_sentences), clustered_sentences | |
def generate_title(self, longtext): | |
if 'cuda' in self.torch_device: | |
torch.cuda.empty_cache() | |
inputs = self.title_tokenizer.prepare_seq2seq_batch(longtext, truncation=True, padding='longest', | |
return_tensors='pt').to(self.torch_device) | |
with torch.no_grad(): | |
summary_ids = self.title_model.generate(**inputs) | |
summary = self.title_tokenizer.batch_decode(summary_ids, skip_special_tokens=True, | |
clean_up_tokenization_spaces=True) | |
return str(summary[0]) | |
def get_clustered_sections(self, clustered_lines): | |
clusters_dict = {} | |
for i, cluster in clustered_lines.items(): | |
# self.print_fn(cluster) | |
try: | |
clusters_dict[self.generate_title(str(" ".join(cluster)))] = self.abstractive_summary( | |
str(" ".join(cluster)).lower()) | |
except: | |
clusters_dict[self.generate_title(str(" ".join(cluster)))] = self.abstractive_summary( | |
self.extractive_summary(str(" ".join(cluster)).lower())) | |
return clusters_dict | |
def get_intro(self, corpus_known_sections, research_blocks): | |
intro_lines = "" | |
intro_lines += str(" ".join([l.lower() for l in corpus_known_sections['introduction']])) + str( | |
" ".join([l.lower() for l in corpus_known_sections['conclusion']])) | |
intro_lines += research_blocks['introduction'] + research_blocks['conclusion'] | |
try: | |
return self.abstractive_summary(intro_lines) | |
except: | |
return self.abstractive_summary(self.extractive_summary(intro_lines)) | |
def get_conclusion(self, research_sections): | |
paper_body = "" | |
for k, v in research_sections.items(): | |
paper_body += v | |
try: | |
return self.abstractive_summary(paper_body) | |
except: | |
return self.abstractive_summary(self.extractive_summary(paper_body)) | |
def build_corpus_sectionwise(self, papers): | |
known = ['abstract', 'introduction', 'conclusion'] | |
corpus_known_sections = {} | |
for kh in known: | |
khtext = [] | |
for p in papers: | |
for section in p['sections']: | |
if kh in section['heading']: | |
khtext.extend(section['highlights']) | |
# self.print_fn(khtext) | |
corpus_known_sections[kh] = khtext | |
return corpus_known_sections | |
def standardize_headings(self, papers): | |
known = ['abstract', 'introduction', 'discussion', 'relatedwork', 'contribution', 'analysis', 'experiments', | |
'conclusion'] | |
for p in papers: | |
# self.print_fn("================================") | |
headings = [section['heading'] for section in p['sections'] if len(section['heading'].split()) < 3] | |
# self.print_fn("id: "+ str(p['id'])+"\nHeadings: \n"+str('\n'.join(headings))) | |
for kh in known: | |
for section in p['sections']: | |
if len(section['heading'].split()) < 3: | |
# self.print_fn(section['heading']) | |
if kh in ''.join(filter(str.isalpha, section['heading'].replace(' ', '').lower())): | |
# self.print_fn("orig head: "+ section['heading'] +", plain head:" + kh) | |
section['heading'] = kh | |
return papers | |
def build_corpus(self, papers, papers_meta): | |
corpus = self.build_meta_corpus(papers_meta) | |
for p in papers: | |
ph = [] | |
for sid, section in enumerate(p['sections']): | |
ph.extend(section['highlights']) | |
for pid, ls in corpus.items(): | |
if pid == p['id']: | |
corpus[pid] = p['abstract'] + str(' '.join(ph)) | |
''' | |
self.print_fn("================== final corpus ====================") | |
self.print_fn('\n'.join([str("paper: "+ get_by_pid(pid, papers_meta)['title']+" \nhighlight count: " + str(len(phs))) for pid, phs in corpus.items()])) | |
self.print_fn("======== sample point ========") | |
p = random.choice(list(papers)) | |
self.print_fn("paper: "+ p['title']+" \nhighlights: " + str(corpus[p['id']])) | |
self.print_fn("======== sample meta point ========") | |
p = random.choice(list(papers_meta)) | |
self.print_fn("meta paper: "+ p['title']+" \nhighlights: " + str(corpus[p['id']])) | |
''' | |
return corpus | |
def get_by_pid(self, pid, papers): | |
for p in papers: | |
if p['id'] == pid: | |
return p | |
def build_meta_corpus(self, papers): | |
meta_corpus = {} | |
for p in papers: | |
# pself.print_fn(p) | |
pid = p['id'] | |
ptext = p['title'] + ". " + p['abstract'] | |
doc = self.nlp(ptext) | |
phs, _, _ = self.extractive_highlights([str(sent) for sent in list(doc.sents)]) | |
meta_corpus[pid] = str(' '.join(phs)) | |
''' | |
self.print_fn("================== meta corpus ====================") | |
self.print_fn('\n'.join([str("paper: "+ get_by_pid(pid, papers)['title']+" \nhighlight count: " + str(len(phs))) for pid, phs in meta_corpus.items()])) | |
self.print_fn("======== sample point ========") | |
p = random.choice(list(papers)) | |
self.print_fn("paper: "+ p['title']+" \nhighlights: " + str(meta_corpus[p['id']])) | |
''' | |
return meta_corpus | |
def select_papers(self, papers, query, num_papers=20): | |
import numpy as np | |
# self.print_fn("paper sample: ") | |
# self.print_fn(papers) | |
meta_corpus = self.build_meta_corpus(papers) | |
scores = [] | |
pids = [] | |
for id, highlights in meta_corpus.items(): | |
score = self.text_para_similarity(query, highlights) | |
scores.append(score) | |
pids.append(id) | |
self.print_fn("corpus item: " + str(self.get_by_pid(id, papers)['title'])) | |
idx = np.argsort(scores)[:num_papers] | |
#for i in range(len(scores)): | |
# self.print_fn("paper: " + str(self.get_by_pid(pids[i], papers)['title'])) | |
# self.print_fn("score: " + str(scores[i])) | |
# self.print_fn("argsort ids("+str(num_papers)+" papers): "+ str(idx)) | |
idx = [pids[i] for i in idx] | |
# self.print_fn("argsort pids("+str(num_papers)+" papers): "+ str(idx)) | |
papers_selected = [p for p in papers if p['id'] in idx] | |
# assert(len(papers_selected)==num_papers) | |
self.print_fn("num papers selected: " + str(len(papers_selected))) | |
for p in papers_selected: | |
self.print_fn("Selected Paper: " + p['title']) | |
self.print_fn("constrast with natural selection: forward") | |
for p in papers[:4]: | |
self.print_fn("Selected Paper: " + p['title']) | |
self.print_fn("constrast with natural selection: backward") | |
for p in papers[-4:]: | |
self.print_fn("Selected Paper: " + p['title']) | |
# arxiv search producing better relevnce | |
return papers_selected | |
def extractive_summary(self, text): | |
if 'cuda' in self.torch_device: | |
torch.cuda.empty_cache() | |
with torch.no_grad(): | |
res = self.model(text, ratio=0.5) | |
res_doc = self.nlp(res) | |
return " ".join(set([str(sent) for sent in list(res_doc.sents)])) | |
def extractive_highlights(self, lines): | |
# text = " ".join(lines) | |
# text_doc = nlp(" ".join([l.lower() for l in lines])) | |
# text = ' '.join([ str(sent) for sent in list(text_doc.sents)]) | |
if 'cuda' in self.torch_device: | |
torch.cuda.empty_cache() | |
with torch.no_grad(): | |
res = self.model(" ".join([l.lower() for l in lines]), ratio=0.5, ) | |
res_doc = self.nlp(res) | |
res_lines = set([str(sent) for sent in list(res_doc.sents)]) | |
# self.print_fn("\n-".join(res_sents)) | |
with torch.no_grad(): | |
keywords = self.kw_model.extract_keywords(str(" ".join([l.lower() for l in lines])), stop_words='english') | |
keyphrases = self.kw_model.extract_keywords(str(" ".join([l.lower() for l in lines])), | |
keyphrase_ngram_range=(4, 4), | |
stop_words='english', use_mmr=True, diversity=0.7) | |
return res_lines, keywords, keyphrases | |
def extract_highlights(self, papers): | |
for p in papers: | |
sid = 0 | |
p['sections'] = [] | |
for heading, lines in p['body_text'].items(): | |
hs, kws, kps = self.extractive_highlights(lines) | |
p['sections'].append({ | |
'sid': sid, | |
'heading': heading, | |
'text': lines, | |
'highlights': hs, | |
'keywords': kws, | |
'keyphrases': kps, | |
}) | |
sid += 1 | |
return papers | |
def extract_structure(self, papers, pdf_dir, txt_dir, img_dir, dump_dir, tab_dir, tables=False): | |
self.print_fn("\n-extracting sections.. ") | |
papers, ids_none = self.extract_parts(papers, txt_dir, dump_dir) | |
self.print_fn("\n-extracting images.. for future correlation use-cases ") | |
papers = self.extract_images(papers, pdf_dir, img_dir) | |
if tables: | |
self.print_fn("\n-extracting tables.. for future correlation use-cases ") | |
papers = self.extract_tables(papers, pdf_dir, tab_dir) | |
return papers, ids_none | |
def extract_parts(self, papers, txt_dir, dump_dir): | |
headings_all = {} | |
# refined = [] | |
# model = build_summarizer() | |
#for file in glob.glob(txt_dir + '/*.txt'): | |
for p in papers: | |
file = txt_dir + '/'+ p['id'] +'.txt' | |
refined, headings_extracted = self.extract_headings(file) | |
sections = self.extract_sections(headings_extracted, refined) | |
# highlights = {k: extract_highlights(model,v) for k, v in sections.items()} | |
#p = self.get_by_file(file, papers) | |
#if len(headings_extracted) > 3: | |
p['body_text'] = sections | |
# p['body_highlights'] = highlights | |
headings_all[p['id']] = headings_extracted | |
ids_none = {i: h for i, h in headings_all.items() if len(h) < 3} | |
''' | |
for f, h in headings_all.items(): | |
if len(h) < 4: | |
self.print_fn("=================headings almost undetected================") | |
self.print_fn(f) | |
self.print_fn(h) | |
''' | |
# from pprint import pprint | |
# pself.print_fn({f: len(h) for f,h in headings_all.items()}) | |
papers_none = [p for p in papers if p['id'] in ids_none] | |
for p in papers_none: | |
os.remove(txt_dir + '/'+ p['id'] + '.txt') | |
papers.remove(p) | |
return papers, ids_none | |
def check_para(self, df): | |
size = 0 | |
for col in df.columns: | |
size += df[col].apply(lambda x: len(str(x))).median() | |
return size / len(df.columns) > 25 | |
def scan_blocks(self, lines): | |
lines_mod = [line.strip().replace('\n', '') for line in lines if len(line.strip().replace('\n', '')) > 3] | |
for i in range(len(lines_mod)): | |
yield lines_mod[i:i + 3] | |
def extract_sections(self, headings, lines, min_part_length=2): | |
sections = {} | |
self.check_list_elems_in_list(headings, lines) | |
head_len = len(headings) | |
for i in range(len(headings) - 1): | |
start = headings[i] | |
end = headings[i + 1] | |
section = self.get_section(start, end, lines) | |
# self.print_fn(start + " : "+ str(len(section)) +" lines") | |
''' | |
if i > 0: | |
old = headings[i-1] | |
if len(section) < min_part_length + 1: | |
sections[old].extend(start) | |
sections[old].extend(section) | |
else: | |
sections[start] = section | |
else: | |
sections[start] = section | |
''' | |
sections[start] = section | |
return {k: v for k, v in sections.items()} | |
def is_rubbish(self, s, rubbish_tolerance=0.2, min_char_len=4): | |
# numbers = sum(c.isdigit() for c in s) | |
letters = sum(c.isalpha() for c in s) | |
spaces = sum(c.isspace() for c in s) | |
# others = len(s) - numbers - letters - spaces | |
if len(s) == 0: | |
return False | |
if ((len(s) - (letters + spaces)) / len(s) >= rubbish_tolerance) or self.alpha_length(s) < min_char_len: | |
return True | |
else: | |
return False | |
def get_section(self, first, last, lines): | |
try: | |
assert (first in lines) | |
assert (last in lines) | |
# start = lines.index( first ) + len( first ) | |
# end = lines.index( last, start ) | |
start = [i for i in range(len(lines)) if first is lines[i]][0] | |
end = [i for i in range(len(lines)) if last is lines[i]][0] | |
section_lines = lines[start + 1:end] | |
# self.print_fn("heading: " + str(first)) | |
# self.print_fn("section_lines: "+ str(section_lines)) | |
# self.print_fn(section_lines) | |
return section_lines | |
except ValueError: | |
self.print_fn("value error :") | |
self.print_fn("first heading :" + str(first) + ", second heading :" + str(last)) | |
self.print_fn("first index :" + str(start) + ", second index :" + str(end)) | |
return "" | |
def check_list_elems_in_list(self, headings, lines): | |
import numpy as np | |
# [self.print_fn(head) for head in headings if head not in lines ] | |
return np.all([True if head in lines else False for head in headings]) | |
def check_first_char_upper(self, text): | |
for c in text: | |
if c.isspace(): | |
continue | |
elif c.isalpha(): | |
return c.isupper() | |
def extract_headings(self, txt_file): | |
import re | |
fulltext = self.read_paper(txt_file) | |
lines = self.clean_lines(fulltext) | |
refined, headings = self.scan_text(lines) | |
assert (self.check_list_elems_in_list(headings, refined)) | |
headings = self.check_duplicates(headings) | |
# self.print_fn('===========================================') | |
# self.print_fn(txt_file +": first scan: \n"+str(len(headings))+" headings") | |
# self.print_fn('\n'.join(headings)) | |
# scan_failed - rescan with first match for abstract hook | |
if len(headings) == 0: | |
# self.print_fn('===================') | |
# self.print_fn("run 1 failed") | |
abs_cans = [line for line in lines if 'abstract' in re.sub("\s+", "", line.strip().lower())] | |
if len(abs_cans) != 0: | |
abs_head = abs_cans[0] | |
refined, headings = self.scan_text(lines, abs_head=abs_head) | |
self.check_list_elems_in_list(headings, refined) | |
headings = self.check_duplicates(headings) | |
# self.print_fn('===================') | |
# self.print_fn(txt_file +": second scan: \n"+str(len(headings))+" headings") | |
# if len(headings) == 0: | |
# self.print_fn("heading scan failed completely") | |
return refined, headings | |
def check_duplicates(self, my_list): | |
my_finallist = [] | |
dups = [s for s in my_list if my_list.count(s) > 1] | |
if len(dups) > 0: | |
[my_finallist.append(n) for n in my_list if n not in my_finallist] | |
# self.print_fn("original: "+str(len(my_list))+" new: "+str(len(my_finallist))) | |
return my_finallist | |
def clean_lines(self, text): | |
import numpy as np | |
import re | |
# doc = nlp(text) | |
# lines = [str(sent) for sent in doc.sents] | |
lines = text.replace('\r', '').split('\n') | |
lines = [line for line in lines if not self.is_rubbish(line)] | |
lines = [line for line in lines if | |
re.match("^[a-zA-Z1-9\.\[\]\(\):\-,\"\"\s]*$", line) and not 'Figure' in line and not 'Table' in line] | |
lengths_cleaned = [self.alpha_length(line) for line in lines] | |
mean_length_cleaned = np.median(lengths_cleaned) | |
lines_standardized = [] | |
for line in lines: | |
if len(line) >= (1.8 * mean_length_cleaned): | |
first_half = line[0:len(line) // 2] | |
second_half = line[len(line) // 2 if len(line) % 2 == 0 else ((len(line) // 2) + 1):] | |
lines_standardized.append(first_half) | |
lines_standardized.append(second_half) | |
else: | |
lines_standardized.append(line) | |
return lines | |
def scan_text(self, lines, abs_head=None): | |
import re | |
# self.print_fn('\n'.join(lines)) | |
record = False | |
headings = [] | |
refined = [] | |
for i in range(1, len(lines) - 4): | |
line = lines[i] | |
line = line.replace('\n', '').strip() | |
if 'abstract' in re.sub("\s+", "", line.strip().lower()) and len(line) - len('abstract') < 5 or ( | |
abs_head is not None and abs_head in line): | |
record = True | |
headings.append(line) | |
refined.append(line) | |
if 'references' in re.sub("\s+", "", line.strip().lower()) and len(line) - len('references') < 5: | |
headings.append(line) | |
refined.append(line) | |
break | |
elif 'bibliography' in re.sub("\s+", "", line.strip().lower()) and len(line) - len('bibliography') < 5: | |
headings.append(line) | |
refined.append(line) | |
break | |
refined, headings = self.scanline(record, headings, refined, i, lines) | |
# self.print_fn('=========in scan_text loop i : '+str(i)+' heading count : '+str(len(headings))+' =========') | |
return refined, headings | |
def scanline(self, record, headings, refined, id, lines): | |
import numpy as np | |
import re | |
line = lines[id] | |
if not len(line) == 0: | |
# self.print_fn("in scanline") | |
# self.print_fn(line) | |
if record: | |
refined.append(line) | |
if len(lines[id - 1]) == 0 or len(lines[id + 1]) == 0 or re.match( | |
"^[1-9XVIABCD]{0,4}(\.{0,1}[1-9XVIABCD]{0,4}){0, 3}\s{0,2}[A-Z][a-zA-Z\:\-\s]*$", | |
line) and self.char_length(line) > 7: | |
# self.print_fn("candidate") | |
# self.print_fn(line) | |
if np.mean([len(s) for s in lines[id + 2:id + 6]]) > 40 and self.check_first_char_upper( | |
line) and re.match("^[a-zA-Z1-9\.\:\-\s]*$", line) and len(line.split()) < 10: | |
# if len(line) < 20 and np.mean([len(s) for s in lines[i+1:i+5]]) > 30 : | |
headings.append(line) | |
assert (line in refined) | |
# self.print_fn("selected") | |
# self.print_fn(line) | |
else: | |
known_headings = ['introduction', 'conclusion', 'abstract', 'references', 'bibliography'] | |
missing = [h for h in known_headings if not np.any([True for head in headings if h in head])] | |
# for h in missing: | |
head = [line for h in missing if h in re.sub("\s+", "", line.strip().lower())] | |
# head = [line for known] | |
if len(head) > 0: | |
headings.append(head[0]) | |
assert (head[0] in refined) | |
return refined, headings | |
def char_length(self, s): | |
# numbers = sum(c.isdigit() for c in s) | |
letters = sum(c.isalpha() for c in s) | |
# spaces = sum(c.isspace() for c in s) | |
# others = len(s) - numbers - letters - spaces | |
return letters | |
def get_by_file(self, file, papers): | |
import os | |
pid = os.path.basename(file) | |
pid = pid.replace('.txt', '').replace('.pdf', '') | |
for p in papers: | |
if p['id'] == pid: | |
return p | |
self.print_fn("\n-paper not found by file, \nfile: "+file+"\nall papers: "+', '.join([p['id'] for p in papers])) | |
def alpha_length(self, s): | |
# numbers = sum(c.isdigit() for c in s) | |
letters = sum(c.isalpha() for c in s) | |
spaces = sum(c.isspace() for c in s) | |
# others = len(s) - numbers - letters - spaces | |
return letters + spaces | |
def check_append(self, baselist, addstr): | |
check = False | |
for e in baselist: | |
if addstr in e: | |
check = True | |
if not check: | |
baselist.append(addstr) | |
return baselist | |
def extract_images(self, papers, pdf_dir, img_dir): | |
import fitz | |
# self.print_fn("in images") | |
for p in papers: | |
file = pdf_dir + p['id'] + ".pdf" | |
pdf_file = fitz.open(file) | |
images = [] | |
for page_index in range(len(pdf_file)): | |
page = pdf_file[page_index] | |
images.extend(page.getImageList()) | |
images_files = [self.save_image(pdf_file.extractImage(img[0]), i, p['id'], img_dir) for i, img in | |
enumerate(set(images)) if img[0]] | |
# self.print_fn(len(images_per_paper)) | |
p['images'] = images_files | |
# self.print_fn(len(p.keys())) | |
# self.print_fn(papers[0].keys()) | |
return papers | |
def extract_images_from_file(self, pdf_file_name, img_dir): | |
import fitz | |
pdf_file = fitz.open(pdf_file_name) | |
images = [] | |
for page_index in range(len(pdf_file)): | |
page = pdf_file[page_index] | |
images.extend(page.getImageList()) | |
images_files = [self.save_image(pdf_file.extractImage(img[0]), i, pdf_file_name.replace('.pdf', ''), img_dir) for i, img in | |
enumerate(set(images)) if img[0]] | |
return images_files | |
def save_image(self, base_image, img_index, pid, img_dir): | |
from PIL import Image | |
import io | |
image_bytes = base_image["image"] | |
# get the image extension | |
image_ext = base_image["ext"] | |
# load it to PIL | |
image = Image.open(io.BytesIO(image_bytes)) | |
# save it to local disk | |
fname = img_dir + "/" + str(pid) + "_" + str(img_index + 1) + "." + image_ext | |
image.save(open(f"{fname}", "wb")) | |
# self.print_fn(fname) | |
return fname | |
def save_tables(self, dfs, pid, tab_dir): | |
# todo | |
dfs = [df for df in dfs if not self.check_para(df)] | |
files = [] | |
for df in dfs: | |
filename = tab_dir + "/" + str(pid) + ".csv" | |
files.append(filename) | |
df.to_csv(filename, index=False) | |
return files | |
def extract_tables(self, papers, pdf_dir, tab_dir): | |
import tabula | |
check = True | |
# for file in glob.glob(pdf_dir+'/*.pdf'): | |
for p in papers: | |
dfs = tabula.read_pdf(pdf_dir + p['id'] + ".pdf", pages='all', multiple_tables=True, silent=True) | |
p['tables'] = self.save_tables(dfs, p['id'], tab_dir) | |
# self.print_fn(papers[0].keys()) | |
return papers | |
def extract_tables_from_file(self, pdf_file_name, tab_dir): | |
import tabula | |
check = True | |
# for file in glob.glob(pdf_dir+'/*.pdf'): | |
dfs = tabula.read_pdf(pdf_file_name, pages='all', multiple_tables=True, silent=True) | |
return self.save_tables(dfs, pdf_file_name.replace('.pdf', ''), tab_dir) | |
def search(self, query_text=None, id_list=None, max_search=100): | |
import arxiv | |
from urllib.parse import urlparse | |
if query_text: | |
search = arxiv.Search( | |
query=query_text, | |
max_results=max_search, | |
sort_by=arxiv.SortCriterion.Relevance | |
) | |
else: | |
id_list = [id for id in id_list if '.' in id] | |
search = arxiv.Search( | |
id_list=id_list | |
) | |
results = [result for result in search.get()] | |
searched_papers = [] | |
discarded_ids = [] | |
for result in results: | |
id = urlparse(result.entry_id).path.split('/')[-1].split('v')[0] | |
if '.' in id: | |
paper = { | |
'id': id, | |
'title': result.title, | |
'comments': result.comment if result.journal_ref else "None", | |
'journal-ref': result.journal_ref if result.journal_ref else "None", | |
'doi': str(result.doi), | |
'primary_category': result.primary_category, | |
'categories': result.categories, | |
'license': None, | |
'abstract': result.summary, | |
'published': result.published, | |
'pdf_url': result.pdf_url, | |
'links': [str(l) for l in result.links], | |
'update_date': result.updated, | |
'authors': [str(a.name) for a in result.authors], | |
} | |
searched_papers.append(paper) | |
else: | |
discarded_ids.append(urlparse(result.entry_id).path.split('/')[-1].split('v')[0]) | |
self.print_fn("\n-Papers discarded due to id error [arxiv api bug: #74] :\n" + str(discarded_ids)) | |
return results, searched_papers | |
def download_pdfs(self, papers, pdf_dir): | |
import arxiv | |
from urllib.parse import urlparse | |
ids = [p['id'] for p in papers] | |
self.print_fn("\n-downloading below selected papers: ") | |
self.print_fn(ids) | |
# asert(False) | |
papers_filtered = arxiv.Search(id_list=ids).get() | |
for p in papers_filtered: | |
p_id = str(urlparse(p.entry_id).path.split('/')[-1]).split('v')[0] | |
download_file = pdf_dir + "/" + p_id + ".pdf" | |
p.download_pdf(filename=download_file) | |
def download_sources(self, papers, src_dir): | |
import arxiv | |
from urllib.parse import urlparse | |
ids = [p['id'] for p in papers] | |
self.print_fn(ids) | |
# asert(False) | |
papers_filtered = arxiv.Search(id_list=ids).get() | |
for p in papers_filtered: | |
p_id = str(urlparse(p.entry_id).path.split('/')[-1]).split('v')[0] | |
download_file = src_dir + "/" + p_id + ".tar.gz" | |
p.download_source(filename=download_file) | |
def convert_pdfs(self, pdf_dir, txt_dir): | |
import glob, shutil | |
import multiprocessing | |
# import arxiv_public_data | |
convert_directory_parallel(pdf_dir, multiprocessing.cpu_count()) | |
for file in glob.glob(pdf_dir + '/*.txt'): | |
shutil.move(file, txt_dir) | |
def read_paper(self, path): | |
f = open(path, 'r', encoding="utf-8") | |
text = str(f.read()) | |
f.close() | |
return text | |
def cocitation_network(self, papers, txt_dir): | |
import multiprocessing | |
cites = internal_citations.citation_list_parallel(N=multiprocessing.cpu_count(), directory=txt_dir) | |
self.print_fn("\n-citation-network: ") | |
self.print_fn(cites) | |
for p in papers: | |
p['cites'] = cites[p['id']] | |
return papers, cites | |
def lookup_author(self, author_query): | |
from scholarly import scholarly | |
import operator | |
# Retrieve the author's data, fill-in, and print | |
self.print_fn("Searching Author: " + author_query) | |
search_result = next(scholarly.search_author(author_query), None) | |
if search_result is not None: | |
author = scholarly.fill(search_result) | |
author_stats = { | |
'name': author_query, | |
'affiliation': author['affiliation'] if author['affiliation'] else None, | |
'citedby': author['citedby'] if 'citedby' in author.keys() else 0, | |
'most_cited_year': max(author['cites_per_year'].items(), key=operator.itemgetter(1))[0] if len( | |
author['cites_per_year']) > 0 else None, | |
'coauthors': [c['name'] for c in author['coauthors']], | |
'hindex': author['hindex'], | |
'impact': author['i10index'], | |
'interests': author['interests'], | |
'publications': [{'title': p['bib']['title'], 'citations': p['num_citations']} for p in | |
author['publications']], | |
'url_picture': author['url_picture'], | |
} | |
else: | |
self.print_fn("author not found") | |
author_stats = { | |
'name': author_query, | |
'affiliation': "", | |
'citedby': 0, | |
'most_cited_year': None, | |
'coauthors': [], | |
'hindex': 0, | |
'impact': 0, | |
'interests': [], | |
'publications': [], | |
'url_picture': "", | |
} | |
# pself.print_fn(author_stats) | |
return author_stats | |
def author_stats(self, papers): | |
all_authors = [] | |
for p in papers: | |
paper_authors = [a for a in p['authors']] | |
all_authors.extend(paper_authors) | |
searched_authors = [self.lookup_author(a) for a in set(all_authors)] | |
return searched_authors | |
def text_similarity(self, text1, text2): | |
doc1 = self.similarity_nlp(text1) | |
doc2 = self.similarity_nlp(text2) | |
return doc1.similarity(doc2) | |
def text_para_similarity(self, text, lines): | |
doc1 = self.similarity_nlp(text) | |
doc2 = self.similarity_nlp(" ".join(lines)) | |
return doc1.similarity(doc2) | |
def para_para_similarity(self, lines1, lines2): | |
doc1 = self.similarity_nlp(" ".join(lines1)) | |
doc2 = self.similarity_nlp(" ".join(lines2)) | |
return doc1.similarity(doc2) | |
def text_image_similarity(self, text, image): | |
pass | |
def ask(self, corpus, question): | |
text = " ".join(corpus) | |
import torch | |
inputs = self.qatokenizer(question, text, return_tensors='pt') | |
start_positions = torch.tensor([1]) | |
end_positions = torch.tensor([3]) | |
outputs = self.qamodel(**inputs, start_positions=start_positions, end_positions=end_positions) | |
self.print_fn("context: " + text) | |
self.print_fn("question: " + question) | |
self.print_fn("outputs: " + outputs) | |
return outputs | |
def zip_outputs(self, dump_dir, query): | |
import zipfile | |
def zipdir(path, ziph): | |
# ziph is zipfile handle | |
for root, dirs, files in os.walk(path): | |
for file in files: | |
ziph.write(os.path.join(root, file), | |
os.path.relpath(os.path.join(root, file), | |
os.path.join(path, '../..'))) | |
zip_name = 'arxiv_dumps_'+query.replace(' ', '_')+'.zip' | |
zipf = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) | |
zipdir(dump_dir, zipf) | |
return zip_name | |
def survey(self, query=None, id_list=None, max_search=None, num_papers=None, debug=False, weigh_authors=False): | |
import joblib | |
import os, shutil | |
if not max_search: | |
max_search = self.DEFAULTS['max_search'] | |
if not num_papers: | |
num_papers = self.DEFAULTS['num_papers'] | |
if (query is None) and (id_list is None): | |
raise ValueError('please provide a base to survey on: list of arxiv IDs or a few research keywords') | |
# arxiv api relevance search and data preparation | |
self.print_fn("\n-searching arXiv for top 100 papers.. ") | |
results, searched_papers = self.search(query, id_list, max_search=max_search) | |
joblib.dump(searched_papers, self.dump_dir + 'papers_metadata.dmp') | |
self.print_fn("\n-found " + str(len(searched_papers)) + " papers") | |
# paper selection by scibert vector embedding relevance scores | |
# papers_selected = select_papers(searched_papers, query, num_papers=num_papers) | |
papers_highlighted, papers_selected = self.pdf_route(self.pdf_dir, self.txt_dir, self.img_dir, self.tab_dir, self.dump_dir, | |
searched_papers) | |
if weigh_authors: | |
authors = self.author_stats(papers_highlighted) | |
joblib.dump(papers_highlighted, self.dump_dir + 'papers_highlighted.dmp') | |
self.print_fn("\n-Standardizing known section headings per paper.. ") | |
papers_standardized = self.standardize_headings(papers_highlighted) | |
joblib.dump(papers_standardized, self.dump_dir + 'papers_standardized.dmp') | |
self.print_fn("\n-Building paper-wise corpus.. ") | |
corpus = self.build_corpus(papers_highlighted, searched_papers) | |
joblib.dump(corpus, self.dump_dir + 'corpus.dmp') | |
self.print_fn("\n-Building section-wise corpus.. ") | |
corpus_sectionwise = self.build_corpus_sectionwise(papers_standardized) | |
joblib.dump(corpus_sectionwise, self.dump_dir + 'corpus_sectionwise.dmp') | |
self.print_fn("\n-Building basic research highlights.. ") | |
research_blocks = self.build_basic_blocks(corpus_sectionwise, corpus) | |
joblib.dump(research_blocks, self.dump_dir + 'research_blocks.dmp') | |
self.print_fn("\n-Reducing corpus to lines.. ") | |
corpus_lines = self.get_corpus_lines(corpus) | |
joblib.dump(corpus_lines, self.dump_dir + 'corpus_lines.dmp') | |
# temp | |
# searched_papers = joblib.load(dump_dir + 'papers_metadata.dmp') | |
''' | |
papers_highlighted = joblib.load(dump_dir + 'papers_highlighted.dmp') | |
corpus = joblib.load(dump_dir + 'corpus.dmp') | |
papers_standardized = joblib.load(dump_dir + 'papers_standardized.dmp') | |
corpus_sectionwise = joblib.load(dump_dir + 'corpus_sectionwise.dmp') | |
research_blocks = joblib.load(dump_dir + 'research_blocks.dmp') | |
corpus_lines = joblib.load(dump_dir + 'corpus_lines.dmp') | |
''' | |
''' | |
self.print_fn("papers_highlighted types:"+ str(np.unique([str(type(p['sections'][0]['highlights'])) for p in papers_highlighted]))) | |
self.print_fn("papers_highlighted example:") | |
self.print_fn(random.sample(list(papers_highlighted), 1)[0]['sections'][0]['highlights']) | |
self.print_fn("corpus types:"+ str(np.unique([str(type(txt)) for k,txt in corpus.items()]))) | |
self.print_fn("corpus example:") | |
self.print_fn(random.sample(list(corpus.items()), 1)[0]) | |
self.print_fn("corpus_lines types:"+ str(np.unique([str(type(txt)) for txt in corpus_lines]))) | |
self.print_fn("corpus_lines example:") | |
self.print_fn(random.sample(list(corpus_lines), 1)[0]) | |
self.print_fn("corpus_sectionwise types:"+ str(np.unique([str(type(txt)) for k,txt in corpus_sectionwise.items()]))) | |
self.print_fn("corpus_sectionwise example:") | |
self.print_fn(random.sample(list(corpus_sectionwise.items()), 1)[0]) | |
self.print_fn("research_blocks types:"+ str(np.unique([str(type(txt)) for k,txt in research_blocks.items()]))) | |
self.print_fn("research_blocks example:") | |
self.print_fn(random.sample(list(research_blocks.items()), 1)[0]) | |
''' | |
# self.print_fn("corpus types:"+ str(np.unique([type(txt) for k,txt in corpus.items()]))) | |
self.print_fn("\n-Building abstract.. ") | |
abstract_block = self.get_abstract(corpus_lines, corpus_sectionwise, research_blocks) | |
joblib.dump(abstract_block, self.dump_dir + 'abstract_block.dmp') | |
''' | |
self.print_fn("abstract_block type:"+ str(type(abstract_block))) | |
self.print_fn("abstract_block:") | |
self.print_fn(abstract_block) | |
''' | |
self.print_fn("\n-Building introduction.. ") | |
intro_block = self.get_intro(corpus_sectionwise, research_blocks) | |
joblib.dump(intro_block, self.dump_dir + 'intro_block.dmp') | |
''' | |
self.print_fn("intro_block type:"+ str(type(intro_block))) | |
self.print_fn("intro_block:") | |
self.print_fn(intro_block) | |
''' | |
self.print_fn("\n-Building custom sections.. ") | |
clustered_sections, clustered_sentences = self.get_clusters(papers_standardized, searched_papers) | |
joblib.dump(clustered_sections, self.dump_dir + 'clustered_sections.dmp') | |
joblib.dump(clustered_sentences, self.dump_dir + 'clustered_sentences.dmp') | |
''' | |
self.print_fn("clusters extracted") | |
self.print_fn("clustered_sentences types:"+ str(np.unique([str(type(txt)) for k,txt in clustered_sentences.items()]))) | |
self.print_fn("clustered_sentences example:") | |
self.print_fn(random.sample(list(clustered_sections.items()), 1)[0]) | |
self.print_fn("clustered_sections types:"+ str(np.unique([str(type(txt)) for k,txt in clustered_sections.items()]))) | |
self.print_fn("clustered_sections example:") | |
self.print_fn(random.sample(list(clustered_sections.items()), 1)[0]) | |
''' | |
clustered_sections['abstract'] = abstract_block | |
clustered_sections['introduction'] = intro_block | |
joblib.dump(clustered_sections, self.dump_dir + 'research_sections.dmp') | |
self.print_fn("\n-Building conclusion.. ") | |
conclusion_block = self.get_conclusion(clustered_sections) | |
joblib.dump(conclusion_block, self.dump_dir + 'conclusion_block.dmp') | |
clustered_sections['conclusion'] = conclusion_block | |
''' | |
self.print_fn("conclusion_block type:"+ str(type(conclusion_block))) | |
self.print_fn("conclusion_block:") | |
self.print_fn(conclusion_block) | |
''' | |
survey_file = 'A_Survey_on_' + query.replace(' ', '_') + '.txt' | |
self.build_doc(clustered_sections, papers_standardized, query=query, filename=self.dump_dir + survey_file) | |
shutil.copytree('arxiv_data/', self.dump_dir + '/arxiv_data/') | |
shutil.copy(self.dump_dir + survey_file, survey_file) | |
assert (os.path.exists(survey_file)) | |
output_zip = self.zip_outputs(self.dump_dir, query) | |
self.print_fn("\n-Survey complete.. \nSurvey file path :" + os.path.abspath( | |
survey_file) + "\nAll outputs zip path :" + os.path.abspath(self.dump_dir + output_zip)) | |
return os.path.abspath(self.dump_dir + output_zip), os.path.abspath(survey_file) | |