mtDNALocation / smart_fallback.py
VyLala's picture
Update smart_fallback.py
de2ea69 verified
raw
history blame
10.5 kB
from Bio import Entrez, Medline
#import model
import mtdna_classifier
from NER.html import extractHTML
import data_preprocess
import pipeline
# Setup
def fetch_ncbi(accession_number):
try:
Entrez.email = "your.email@example.com" # Required by NCBI, REPLACE WITH YOUR EMAIL
handle = Entrez.efetch(db="nucleotide", id=str(accession_number), rettype="gb", retmode="xml")
record = Entrez.read(handle)
handle.close()
outputs = {"authors":"unknown",
"institution":"unknown",
"isolate":"unknown",
"definition":"unknown",
"title":"unknown",
"seq_comment":"unknown",
"collection_date":"unknown" } #'GBSeq_update-date': '25-OCT-2023', 'GBSeq_create-date'
gb_seq = None
# Validate record structure: It should be a list with at least one element (a dict)
if isinstance(record, list) and len(record) > 0:
if isinstance(record[0], dict):
gb_seq = record[0]
else:
print(f"Warning: record[0] is not a dictionary for {accession_number}. Type: {type(record[0])}")
# extract collection date
if "GBSeq_create-date" in gb_seq and outputs["collection_date"]=="unknown":
outputs["collection_date"] = gb_seq["GBSeq_create-date"]
else:
if "GBSeq_update-date" in gb_seq and outputs["collection_date"]=="unknown":
outputs["collection_date"] = gb_seq["GBSeq_update-date"]
# extract definition
if "GBSeq_definition" in gb_seq and outputs["definition"]=="unknown":
outputs["definition"] = gb_seq["GBSeq_definition"]
# extract related-reference things
if "GBSeq_references" in gb_seq:
for ref in gb_seq["GBSeq_references"]:
# extract authors
if "GBReference_authors" in ref and outputs["authors"]=="unknown":
outputs["authors"] = "and ".join(ref["GBReference_authors"])
# extract title
if "GBReference_title" in ref and outputs["title"]=="unknown":
outputs["title"] = ref["GBReference_title"]
# extract submitted journal
if 'GBReference_journal' in ref and outputs["institution"]=="unknown":
outputs["institution"] = ref['GBReference_journal']
# extract seq_comment
if 'GBSeq_comment'in gb_seq and outputs["seq_comment"]=="unknown":
outputs["seq_comment"] = gb_seq["GBSeq_comment"]
# extract isolate
if "GBSeq_feature-table" in gb_seq:
if 'GBFeature_quals' in gb_seq["GBSeq_feature-table"][0]:
for ref in gb_seq["GBSeq_feature-table"][0]["GBFeature_quals"]:
if ref['GBQualifier_name'] == "isolate" and outputs["isolate"]=="unknown":
outputs["isolate"] = ref["GBQualifier_value"]
else:
print(f"Warning: No valid record or empty record list from NCBI for {accession_number}.")
# If gb_seq is still None, return defaults
if gb_seq is None:
return {"authors":"unknown",
"institution":"unknown",
"isolate":"unknown",
"definition":"unknown",
"title":"unknown",
"seq_comment":"unknown",
"collection_date":"unknown" }
return outputs
except:
print("error in fetching ncbi data")
return {"authors":"unknown",
"institution":"unknown",
"isolate":"unknown",
"definition":"unknown",
"title":"unknown",
"seq_comment":"unknown",
"collection_date":"unknown" }
# Fallback if NCBI crashed or cannot find accession on NBCI
def google_accession_search(accession_id):
"""
Search for metadata by accession ID using Google Custom Search.
Falls back to known biological databases and archives.
"""
queries = [
f"{accession_id}",
f"{accession_id} site:ncbi.nlm.nih.gov",
f"{accession_id} site:pubmed.ncbi.nlm.nih.gov",
f"{accession_id} site:europepmc.org",
f"{accession_id} site:researchgate.net",
f"{accession_id} mtDNA",
f"{accession_id} mitochondrial DNA"
]
links = []
for query in queries:
search_results = mtdna_classifier.search_google_custom(query, 2)
for link in search_results:
if link not in links:
links.append(link)
return links
# Method 1: Smarter Google
def smart_google_queries(metadata: dict):
queries = []
# Extract useful fields
isolate = metadata.get("isolate")
author = metadata.get("authors")
institution = metadata.get("institution")
title = metadata.get("title")
combined = []
# Construct queries
if isolate and isolate!="unknown" and isolate!="Unpublished":
queries.append(f'"{isolate}" mitochondrial DNA')
queries.append(f'"{isolate}" site:ncbi.nlm.nih.gov')
if author and author!="unknown" and author!="Unpublished":
# try:
# author_name = ".".join(author.split(' ')[0].split(".")[:-1]) # Use last name only
# except:
# try:
# author_name = author.split(',')[0] # Use last name only
# except:
# author_name = author
try:
author_name = author.split(',')[0] # Use last name only
except:
author_name = author
queries.append(f'"{author_name}" mitochondrial DNA')
queries.append(f'"{author_name}" mtDNA site:researchgate.net')
if institution and institution!="unknown" and institution!="Unpublished":
try:
short_inst = ",".join(institution.split(',')[:2]) # Take first part of institution
except:
try:
short_inst = institution.split(',')[0]
except:
short_inst = institution
queries.append(f'"{short_inst}" mtDNA sequence')
#queries.append(f'"{short_inst}" isolate site:nature.com')
if title and title!='unknown' and title!="Unpublished":
if title!="Direct Submission":
queries.append(title)
return queries
def filter_links_by_metadata(search_results, saveLinkFolder, accession=None, stop_flag=None):
TRUSTED_DOMAINS = [
"ncbi.nlm.nih.gov",
"pubmed.ncbi.nlm.nih.gov",
"pmc.ncbi.nlm.nih.gov",
"biorxiv.org",
"researchgate.net",
"nature.com",
"sciencedirect.com"
]
if stop_flag is not None and stop_flag.value:
print(f"πŸ›‘ Stop detected {accession}, aborting early...")
return []
def is_trusted_link(link):
for domain in TRUSTED_DOMAINS:
if domain in link:
return True
return False
def is_relevant_title_snippet(link, saveLinkFolder, accession=None):
output = []
keywords = ["mtDNA", "mitochondrial", "accession", "isolate", "Homo sapiens", "sequence"]
if accession:
keywords = [accession] + keywords
title_snippet = link.lower()
print("save link folder inside this filter function: ", saveLinkFolder)
success_process, output_process = pipeline.run_with_timeout(data_preprocess.extract_text,args=(link,saveLinkFolder),timeout=60)
if stop_flag is not None and stop_flag.value:
print(f"πŸ›‘ Stop detected {accession}, aborting early...")
return []
if success_process:
article_text = output_process
print("yes succeed for getting article text")
else:
print("no suceed, fallback to no link")
article_text = ""
#article_text = data_preprocess.extract_text(link,saveLinkFolder)
print("article text")
#print(article_text)
if stop_flag is not None and stop_flag.value:
print(f"πŸ›‘ Stop detected {accession}, aborting early...")
return []
try:
ext = link.split(".")[-1].lower()
if ext not in ["pdf", "docx", "xlsx"]:
html = extractHTML.HTML("", link)
if stop_flag is not None and stop_flag.value:
print(f"πŸ›‘ Stop detected {accession}, aborting early...")
return []
jsonSM = html.getSupMaterial()
if jsonSM:
output += sum((jsonSM[key] for key in jsonSM), [])
except Exception:
pass # continue silently
for keyword in keywords:
if keyword.lower() in article_text.lower():
if link not in output:
output.append([link,keyword.lower()])
print("link and keyword for article text: ", link, keyword)
return output
if keyword.lower() in title_snippet.lower():
if link not in output:
output.append([link,keyword.lower()])
print("link and keyword for title: ", link, keyword)
return output
return output
filtered = []
better_filter = []
if len(search_results) > 0:
for link in search_results:
# if is_trusted_link(link):
# if link not in filtered:
# filtered.append(link)
# else:
print(link)
if stop_flag is not None and stop_flag.value:
print(f"πŸ›‘ Stop detected {accession}, aborting early...")
return []
if link:
output_link = is_relevant_title_snippet(link,saveLinkFolder, accession)
print("output link: ")
print(output_link)
for out_link in output_link:
if isinstance(out_link,list) and len(out_link) > 1:
print(out_link)
kw = out_link[1]
print("kw and acc: ", kw, accession.lower())
if accession and kw == accession.lower():
better_filter.append(out_link[0])
filtered.append(out_link[0])
else: filtered.append(out_link)
print("done with link and here is filter: ",filtered)
if better_filter:
filtered = better_filter
return filtered
def smart_google_search(metadata):
queries = smart_google_queries(metadata)
links = []
for q in queries:
#print("\nπŸ” Query:", q)
results = mtdna_classifier.search_google_custom(q,2)
for link in results:
#print(f"- {link}")
if link not in links:
links.append(link)
#filter_links = filter_links_by_metadata(links)
return links
# Method 2: Prompt LLM better or better ai search api with all
# the total information from even ncbi and all search