Spaces:
Running
Running
from Bio import Entrez, Medline | |
#import model | |
import mtdna_classifier | |
from NER.html import extractHTML | |
import data_preprocess | |
import pipeline | |
# Setup | |
def fetch_ncbi(accession_number): | |
try: | |
Entrez.email = "your.email@example.com" # Required by NCBI, REPLACE WITH YOUR EMAIL | |
handle = Entrez.efetch(db="nucleotide", id=str(accession_number), rettype="gb", retmode="xml") | |
record = Entrez.read(handle) | |
handle.close() | |
outputs = {"authors":"unknown", | |
"institution":"unknown", | |
"isolate":"unknown", | |
"definition":"unknown", | |
"title":"unknown", | |
"seq_comment":"unknown", | |
"collection_date":"unknown" } #'GBSeq_update-date': '25-OCT-2023', 'GBSeq_create-date' | |
gb_seq = None | |
# Validate record structure: It should be a list with at least one element (a dict) | |
if isinstance(record, list) and len(record) > 0: | |
if isinstance(record[0], dict): | |
gb_seq = record[0] | |
else: | |
print(f"Warning: record[0] is not a dictionary for {accession_number}. Type: {type(record[0])}") | |
# extract collection date | |
if "GBSeq_create-date" in gb_seq and outputs["collection_date"]=="unknown": | |
outputs["collection_date"] = gb_seq["GBSeq_create-date"] | |
else: | |
if "GBSeq_update-date" in gb_seq and outputs["collection_date"]=="unknown": | |
outputs["collection_date"] = gb_seq["GBSeq_update-date"] | |
# extract definition | |
if "GBSeq_definition" in gb_seq and outputs["definition"]=="unknown": | |
outputs["definition"] = gb_seq["GBSeq_definition"] | |
# extract related-reference things | |
if "GBSeq_references" in gb_seq: | |
for ref in gb_seq["GBSeq_references"]: | |
# extract authors | |
if "GBReference_authors" in ref and outputs["authors"]=="unknown": | |
outputs["authors"] = "and ".join(ref["GBReference_authors"]) | |
# extract title | |
if "GBReference_title" in ref and outputs["title"]=="unknown": | |
outputs["title"] = ref["GBReference_title"] | |
# extract submitted journal | |
if 'GBReference_journal' in ref and outputs["institution"]=="unknown": | |
outputs["institution"] = ref['GBReference_journal'] | |
# extract seq_comment | |
if 'GBSeq_comment'in gb_seq and outputs["seq_comment"]=="unknown": | |
outputs["seq_comment"] = gb_seq["GBSeq_comment"] | |
# extract isolate | |
if "GBSeq_feature-table" in gb_seq: | |
if 'GBFeature_quals' in gb_seq["GBSeq_feature-table"][0]: | |
for ref in gb_seq["GBSeq_feature-table"][0]["GBFeature_quals"]: | |
if ref['GBQualifier_name'] == "isolate" and outputs["isolate"]=="unknown": | |
outputs["isolate"] = ref["GBQualifier_value"] | |
else: | |
print(f"Warning: No valid record or empty record list from NCBI for {accession_number}.") | |
# If gb_seq is still None, return defaults | |
if gb_seq is None: | |
return {"authors":"unknown", | |
"institution":"unknown", | |
"isolate":"unknown", | |
"definition":"unknown", | |
"title":"unknown", | |
"seq_comment":"unknown", | |
"collection_date":"unknown" } | |
return outputs | |
except: | |
print("error in fetching ncbi data") | |
return {"authors":"unknown", | |
"institution":"unknown", | |
"isolate":"unknown", | |
"definition":"unknown", | |
"title":"unknown", | |
"seq_comment":"unknown", | |
"collection_date":"unknown" } | |
# Fallback if NCBI crashed or cannot find accession on NBCI | |
def google_accession_search(accession_id): | |
""" | |
Search for metadata by accession ID using Google Custom Search. | |
Falls back to known biological databases and archives. | |
""" | |
queries = [ | |
f"{accession_id}", | |
f"{accession_id} site:ncbi.nlm.nih.gov", | |
f"{accession_id} site:pubmed.ncbi.nlm.nih.gov", | |
f"{accession_id} site:europepmc.org", | |
f"{accession_id} site:researchgate.net", | |
f"{accession_id} mtDNA", | |
f"{accession_id} mitochondrial DNA" | |
] | |
links = [] | |
for query in queries: | |
search_results = mtdna_classifier.search_google_custom(query, 2) | |
for link in search_results: | |
if link not in links: | |
links.append(link) | |
return links | |
# Method 1: Smarter Google | |
def smart_google_queries(metadata: dict): | |
queries = [] | |
# Extract useful fields | |
isolate = metadata.get("isolate") | |
author = metadata.get("authors") | |
institution = metadata.get("institution") | |
title = metadata.get("title") | |
combined = [] | |
# Construct queries | |
if isolate and isolate!="unknown" and isolate!="Unpublished": | |
queries.append(f'"{isolate}" mitochondrial DNA') | |
queries.append(f'"{isolate}" site:ncbi.nlm.nih.gov') | |
if author and author!="unknown" and author!="Unpublished": | |
# try: | |
# author_name = ".".join(author.split(' ')[0].split(".")[:-1]) # Use last name only | |
# except: | |
# try: | |
# author_name = author.split(',')[0] # Use last name only | |
# except: | |
# author_name = author | |
try: | |
author_name = author.split(',')[0] # Use last name only | |
except: | |
author_name = author | |
queries.append(f'"{author_name}" mitochondrial DNA') | |
queries.append(f'"{author_name}" mtDNA site:researchgate.net') | |
if institution and institution!="unknown" and institution!="Unpublished": | |
try: | |
short_inst = ",".join(institution.split(',')[:2]) # Take first part of institution | |
except: | |
try: | |
short_inst = institution.split(',')[0] | |
except: | |
short_inst = institution | |
queries.append(f'"{short_inst}" mtDNA sequence') | |
#queries.append(f'"{short_inst}" isolate site:nature.com') | |
if title and title!='unknown' and title!="Unpublished": | |
if title!="Direct Submission": | |
queries.append(title) | |
return queries | |
def filter_links_by_metadata(search_results, saveLinkFolder, accession=None, stop_flag=None): | |
TRUSTED_DOMAINS = [ | |
"ncbi.nlm.nih.gov", | |
"pubmed.ncbi.nlm.nih.gov", | |
"pmc.ncbi.nlm.nih.gov", | |
"biorxiv.org", | |
"researchgate.net", | |
"nature.com", | |
"sciencedirect.com" | |
] | |
if stop_flag is not None and stop_flag.value: | |
print(f"π Stop detected {accession}, aborting early...") | |
return [] | |
def is_trusted_link(link): | |
for domain in TRUSTED_DOMAINS: | |
if domain in link: | |
return True | |
return False | |
def is_relevant_title_snippet(link, saveLinkFolder, accession=None): | |
output = [] | |
keywords = ["mtDNA", "mitochondrial", "accession", "isolate", "Homo sapiens", "sequence"] | |
if accession: | |
keywords = [accession] + keywords | |
title_snippet = link.lower() | |
print("save link folder inside this filter function: ", saveLinkFolder) | |
success_process, output_process = pipeline.run_with_timeout(data_preprocess.extract_text,args=(link,saveLinkFolder),timeout=60) | |
if stop_flag is not None and stop_flag.value: | |
print(f"π Stop detected {accession}, aborting early...") | |
return [] | |
if success_process: | |
article_text = output_process | |
print("yes succeed for getting article text") | |
else: | |
print("no suceed, fallback to no link") | |
article_text = "" | |
#article_text = data_preprocess.extract_text(link,saveLinkFolder) | |
print("article text") | |
#print(article_text) | |
if stop_flag is not None and stop_flag.value: | |
print(f"π Stop detected {accession}, aborting early...") | |
return [] | |
try: | |
ext = link.split(".")[-1].lower() | |
if ext not in ["pdf", "docx", "xlsx"]: | |
html = extractHTML.HTML("", link) | |
if stop_flag is not None and stop_flag.value: | |
print(f"π Stop detected {accession}, aborting early...") | |
return [] | |
jsonSM = html.getSupMaterial() | |
if jsonSM: | |
output += sum((jsonSM[key] for key in jsonSM), []) | |
except Exception: | |
pass # continue silently | |
for keyword in keywords: | |
if keyword.lower() in article_text.lower(): | |
if link not in output: | |
output.append([link,keyword.lower()]) | |
print("link and keyword for article text: ", link, keyword) | |
return output | |
if keyword.lower() in title_snippet.lower(): | |
if link not in output: | |
output.append([link,keyword.lower()]) | |
print("link and keyword for title: ", link, keyword) | |
return output | |
return output | |
filtered = [] | |
better_filter = [] | |
if len(search_results) > 0: | |
for link in search_results: | |
# if is_trusted_link(link): | |
# if link not in filtered: | |
# filtered.append(link) | |
# else: | |
print(link) | |
if stop_flag is not None and stop_flag.value: | |
print(f"π Stop detected {accession}, aborting early...") | |
return [] | |
if link: | |
output_link = is_relevant_title_snippet(link,saveLinkFolder, accession) | |
print("output link: ") | |
print(output_link) | |
for out_link in output_link: | |
if isinstance(out_link,list) and len(out_link) > 1: | |
print(out_link) | |
kw = out_link[1] | |
print("kw and acc: ", kw, accession.lower()) | |
if accession and kw == accession.lower(): | |
better_filter.append(out_link[0]) | |
filtered.append(out_link[0]) | |
else: filtered.append(out_link) | |
print("done with link and here is filter: ",filtered) | |
if better_filter: | |
filtered = better_filter | |
return filtered | |
def smart_google_search(metadata): | |
queries = smart_google_queries(metadata) | |
links = [] | |
for q in queries: | |
#print("\nπ Query:", q) | |
results = mtdna_classifier.search_google_custom(q,2) | |
for link in results: | |
#print(f"- {link}") | |
if link not in links: | |
links.append(link) | |
#filter_links = filter_links_by_metadata(links) | |
return links | |
# Method 2: Prompt LLM better or better ai search api with all | |
# the total information from even ncbi and all search |