ReadReview / huixiangdou /service /findarticles.py
root
abstractfunctionadded
5c5c629
# yyj
import requests
import xml.etree.ElementTree as ET
import os
from tqdm import tqdm
import json
import shutil
from loguru import logger
from lxml import etree
import requests
from bs4 import BeautifulSoup
import os
def download_pdfs(path, doi_list): #fox dalao contribution https://github.com/BigWhiteFox
# 确保下载目录存在
if not os.path.exists(path):
os.makedirs(path)
if isinstance(doi_list, str):
doi_list = [doi_list]
href_list = []
for doi in doi_list:
url = f"https://sci-hub.se/{doi}"
response = requests.get(url)
# 检查请求是否成功
if response.status_code == 200:
print(f"成功请求:{url}")
else:
print(f"请求失败:{url},状态码:{response.status_code}")
continue # 如果请求失败,跳过本次循环
soup = BeautifulSoup(response.text, 'html.parser')
buttons = soup.find_all('button', onclick=True)
for button in buttons:
onclick = button.get('onclick')
if onclick:
pdf_url = onclick.split("'")[1]
href_list.append((pdf_url, doi))
print("pdf_url:", pdf_url)
print("href_list:", href_list)
# 遍历href_list中的每个URL
for href, doi in href_list:
pdf_url = f"https:{href}"
try:
response = requests.get(pdf_url, stream=True)
if response.status_code == 200:
filename = doi.replace("/", "_") + ".pdf"
file_path = os.path.join(path, filename)
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"File downloaded and saved as: {file_path}")
else:
print(f"Download failed, Status Code: {response.status_code}, URL: {pdf_url}")
except requests.RequestException as e:
print(f"Failed to download due to an exception: {e}")
class ArticleRetrieval:
def __init__(self,
keywords: list = [],
pmids: list = [],
repo_dir = 'repodir',
retmax = 500):
if keywords is [] and pmids is []:
raise ValueError("Either keywords or pmids must be provided.")
self.keywords = keywords
self.pmids = pmids
self.repo_dir = repo_dir
self.retmax = retmax
self.pmc_ids = []
def esummary_pmc(self):
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi?"
params = {
"db": "pubmed",
"id": ','.join(self.pmids),
# "retmax": self.retmax
}
response = requests.get(base_url, params=params)
root = ET.fromstring(response.content)
results = []
for docsum in root.findall('DocSum'):
pmcid = None
doi = None
abstract = None
id_value = docsum.find('Id').text
for item in docsum.findall('.//Item[@Name="doi"]'):
doi = item.text
break
for item in docsum.findall('.//Item[@Name="pmc"]'):
pmcid = item.text
break
results.append((id_value, pmcid, doi))
logger.info(f"total {len(results)} articles:")
logger.info(f"found {len([r for r in results if r[1] is not None])} articles with PMC ID.")
logger.info(f"found {len([r for r in results if r[2] is not None])} articles with DOI.")
logger.info(f"found {len([r for r in results if r[1] is None and r[2] is None])} articles without PMC ID and DOI.")
self.esummary = results
self.pmc_ids = [r[1] for r in results if r[1] is not None]
self.scihub_doi = [r[2] for r in results if r[1] is None and r[2] is not None]
self.failed_pmids = [r[0] for r in results if r[1] is None and r[2] is None]
## 通过Pubmed数据库检索文章
def esearch_pmc(self):
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
params = {
"db": "pubmed",
"term": '+'.join(self.keywords),
"retmax": self.retmax
}
response = requests.get(base_url, params=params)
root = ET.fromstring(response.content)
idlist = root.find('.//IdList')
try:
pmids = [id_element.text for id_element in idlist.findall('.//Id')]
except:
pmids = []
print(f"Found {len(pmids)} articles for keywords {self.keywords}.")
self.search_pmid = pmids
self.pmids.extend(pmids)
# 解析XML文件
def _get_all_text(self, element):
"""递归获取XML元素及其所有子元素的文本内容。确保element不为None."""
if element is None:
return ""
text = element.text or ""
for child in element:
text += self._get_all_text(child)
if child is not None and child.tail:
text += child.tail
return text
## 清洗XML文件
def _clean_xml(self,txt):
parser = etree.XMLParser(recover=True)
root = ET.fromstring(txt,parser=parser)
txt = self._get_all_text(root)
txt = txt.split('REFERENCES')[0] # 截取参考文献之前的文本
text = '\n\n'.join([t.strip() for t in txt.split('\n') if len(t.strip())>250])
return text
## 通过PMC数据库获取全文
def fetch_full_text(self):
if not os.path.exists(self.repo_dir):
os.makedirs(self.repo_dir)
os.makedirs(self.repo_dir + '_ab')
print(f"Saving articles to {self.repo_dir}.")
self.pmc_success = 0
self.scihub_success = 0
self.abstract_success = 0
self.failed_download = []
self.failed_abstract = []
downloaded = os.listdir(self.repo_dir)
downloaded_ab = os.listdir(self.repo_dir + '_ab')
for id in tqdm(self.pmc_ids, desc="Fetching full texts", unit="article"):
# check if file already downloaded
if f"{id}.txt" in downloaded:
print(f"File already downloaded: {id}")
self.pmc_success += 1
continue
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
params = {
"db": "pmc",
"id": id,
"rettype": "xml",
"retmode": "text"
}
response = requests.get(base_url, params=params)
full_text = self._clean_xml(response.text)
if full_text.strip() == '':
self.failed_download.append(id)
continue
else:
logger.info(full_text[:200])
with open(os.path.join(self.repo_dir,f'{id}.txt'), 'w') as f:
f.write(full_text)
self.pmc_success += 1
for doi in tqdm(self.scihub_doi, desc="Fetching full texts", unit="article"):
# check if file already downloaded
if f"{doi.replace('/','_')}.pdf" in downloaded:
print(f"File already downloaded: {doi}")
self.scihub_success += 1
continue
if download_pdfs(path=self.repo_dir,doi_list = doi):
self.scihub_success += 1
else:
self.failed_download.append(doi)
for pmid in tqdm(self.pmids, desc="Fetching abstract texts", unit="article"):
# check if file already downloaded
if f"{pmid}.txt" in downloaded_ab:
print(f"File already downloaded: {pmid}")
self.scihub_success += 1
continue
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
params = {
"db": "pubmed",
"id": pmid,
}
response = requests.get(base_url, params=params)
root = ET.fromstring(response.content)
abstract = root.find('.//AbstractText')
if abstract is not None:
with open(os.path.join(self.repo_dir + '_ab',f'{pmid}.txt'), 'w') as f:
f.write(abstract.text)
self.abstract_success += 1
else:
self.failed_abstract.append(pmid)
def save_config(self):
config = {
'repo_dir': self.repo_dir,
'keywords': self.keywords,
'retmax': self.retmax,
"search_pmids": self.search_pmid,
'import_pmids': [id for id in self.pmids if id not in self.search_pmid],
'failed_pmids': self.failed_pmids,
'result': [
{
'pmid': r[0],
'pmcid': r[1],
'doi': r[2]
} for r in self.esummary
],
"pmc_success_d": self.pmc_success,
"scihub_success_d": self.scihub_success,
"failed_download": self.failed_download,
"abstract_success": self.abstract_success,
"failed_abstract": self.failed_abstract
}
with open(os.path.join(self.repo_dir, 'info.json'), 'w') as f:
json.dump(config, f, indent=4, ensure_ascii=False)
def initiallize(self):
if self.keywords !=[]:
print(self.keywords)
self.esearch_pmc() # get pmids from pubmed database using keywords
self.esummary_pmc() # get pmc ids from pubmed database using pmids
self.fetch_full_text() # get full text from pmc database using pmc ids
self.save_config() # save config file
if __name__ == '__main__':
if os.path.exists('repodir'):
shutil.rmtree('repodir')
strings = """
34536239
7760895
36109602
24766875"""
string = [k.strip() for k in strings.split('\n')]
pmids = [k for k in string if k.isdigit()]
print(pmids)
keys = [k for k in string if not k.isdigit() and k != '']
print(keys)
articelfinder = ArticleRetrieval(keywords = keys,pmids = pmids,
repo_dir = 'repodir',retmax = 5)
articelfinder.initiallize()