import gradio as gr import pandas as pd import requests import rpy2.robjects as robjects def get_SNP(): r_code = """ library(GWAScFDR) library(dplyr) library(data.table) rm(list = ls()) AD <- fread("Lambert/IGAP_summary_statistics/IGAP_stage_1.txt", header = TRUE, stringsAsFactors = F) AMD_F <- fread("Fritsche/Fritsche-26691988.txt.gz", header = TRUE, stringsAsFactors = F) AD <- AD[as.numeric(AD$Pvalue) < 0.00001, ] AMD_F <- AMD_F[as.numeric(AMD_F$GC.Pvalue) < 0.00001, ] names(AD)[c(3, 8)] <- c("Marker", "p_AD") names(AMD_F)[c(1, 8)] <- c("Marker", "p_AMDF") # merge data AD_AMDF <- merge(AD, AMD_F, by = "Marker", all = FALSE) # calculate cFDR AD_AMDF$FDR_AD_AMD <- cFDR(as.numeric(AD_AMDF$p_AD), as.numeric(AD_AMDF$p_AMDF)) AD_AMDF$FDR_AMD_AD <- cFDR(as.numeric(AD_AMDF$p_AMDF), as.numeric(AD_AMDF$p_AD)) # read package to calculate ccFDR (the bigher one) AD_AMDF$ccFDR <- pmax(AD_AMDF$FDR_AD_AMD, AD_AMDF$FDR_AMD_AD) AD_AMDF <- AD_AMDF[AD_AMDF$ccFDR < 0.001, ] # write.csv(AD_AMDF, file = "ccFDR_AD_AMDF.csv") AD_AMDF_rsid <- AD_AMDF$Marker """ robjects.r(r_code) rsidlist = list(robjects.globalenv["AD_AMDF_rsid"]) return rsidlist def get_SNP_gene(rsidlist): rsid_gene = {} for rsid in rsidlist: url = f"https://www.ebi.ac.uk/gwas/rest/api/singleNucleotidePolymorphisms/{rsid}" print(url) try: response = requests.get(url) response.raise_for_status() data = response.json() genelist = [] for context in data["genomicContexts"]: if context["distance"] != 0: continue else: genelist.append(context["gene"]["geneName"]) genelist = list(set(genelist)) rsid_gene[rsid] = genelist except requests.exceptions.RequestException as e: print(f"Error occurred while fetching data for {rsid}: {str(e)}") continue return rsid_gene def todf(disease1, disease2): if disease1 != "" and disease2 != "": rsidlist = get_SNP() rsid_gene = get_SNP_gene(rsidlist) for key, value in rsid_gene.items(): if value == []: rsid_gene[key] = ["None"] df = pd.DataFrame(list(rsid_gene.items()), columns=['rsid', 'genes']) df['genes'] = df['genes'].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else x) return df def visible(identifiers): identifiers = identifiers.split("\n") identifiers = list(filter(None, identifiers)) if len(identifiers) > 0: return topo_para.update(visible=True), network_stats.update( visible=True), enrichment.update( visible=True), button_download.update(visible=True) # 使用gr.Blocks()创建和组合组件 with gr.Blocks() as demo: with gr.Row(): with gr.Column(scale=1): # 创建输入组件 disease1 = gr.Dropdown(choices=['Alzheimer disease'], value='Alzheimer disease', label='Disease 1') disease2 = gr.Dropdown(choices=['age-related macular degeneration'], value='age-related macular degeneration', label='Disease 2') # 创建按钮组件 with gr.Row(): # button_clear = gr.Button("Clear", elem_id="clear-button") button_input = gr.Button("Submit", elem_id="submit-button") # 临时组件 cache = gr.Textbox(visible=False) with gr.Column(scale=3.25, min_width=985): rsid_gene_df = gr.Dataframe(label="Variants and Genes:", elem_id="topo-para", visible=True) # 按钮监听 button_input.click(fn=todf, inputs=[disease1, disease2], outputs=rsid_gene_df) # 在本地运行demo对象 demo.launch(share=False)