import streamlit as st import requests import os from gliner import GLiNER from streamlit_autorefresh import st_autorefresh import time tok = os.getenv("TOK") #st_autorefresh(interval=10000, key="volter") st.write(tok) def Target_Identification(userinput): model = GLiNER.from_pretrained("Ihor/gliner-biomed-bi-small-v1.0") labels = ["Protein","Mutation"] entities = model.predict_entities(userinput, labels, threshold=0.5) for entity in entities: if entity["label"] == "Protein": return entity["text"] def APP(): tab_map = { 0: "BIO ENGINEERING LAB @newMATTER", } tab_selection = st.pills( "TABS", options=tab_map.keys(), format_func=lambda option: tab_map[option], selection_mode="single", ) def SHOWTABS(): if tab_selection == 0: # Two-column split left_col, right_col = st.columns([0.4, 0.6]) # CSS to make right column sticky st.markdown(""" """, unsafe_allow_html=True) with left_col: option_map = { 0: "@OriginAI Nanobody Engineering:", } selection = st.pills( "BIOLOGICS", options=option_map.keys(), format_func=lambda option: option_map[option], selection_mode="single", ) if selection == 0: st.markdown( "
Nanobody [CANCER targeted]
", unsafe_allow_html=True, ) # Get projects with error handling and caching @st.cache_data(ttl=20) # Cache for 5 minutes to reduce API calls def scan_for_project_availability_cached(user_id): try: request_url = f"https://thexforce-combat-backend.hf.space/{user_id}/projects" response = requests.get( request_url, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {tok}", }, timeout=10 # Add timeout to prevent hanging ) if response.status_code == 200: response_json = response.json() pros = response_json.get("projects", []) # Default to empty list if no projects key project_list = [] if pros: # Check if pros is not None or empty for pro in pros: if isinstance(pro, dict): project_name = pro.get("project") if project_name: # Only add if project name exists project_list.append(project_name) else: if pro: # Only add if pro is not None or empty project_list.append(str(pro)) return project_list else: st.error(f"Failed to fetch projects. Status code: {response.status_code}") return [] except requests.exceptions.RequestException as e: st.error(f"Error fetching projects: {str(e)}") return [] except Exception as e: st.error(f"Unexpected error: {str(e)}") return [] projects = scan_for_project_availability_cached(st.user.email) projectname = None if len(projects) > 0: agree = st.checkbox("new project ?") if agree: #projectname = st.selectbox("Select Project", projects) projectname = st.text_input("Enter project name:") else: projectname = st.selectbox("Select Project", projects) else: projectname = st.text_input("Enter project name:") st.session_state.projectname = projectname with right_col: bio_input = st.chat_input(" Ready for Action ! ") # FIXED: Removed caching and added force_refresh parameter def fetch_ops(force_refresh=False): # Clear cache if force_refresh is True if force_refresh and 'ops_cache' in st.session_state: del st.session_state.ops_cache fetch_url=f"https://thexforce-combat-backend.hf.space/{st.user.email}/{st.session_state.projectname}/individual/experiment" response = requests.get( fetch_url, headers={ "Content-Type": "application/json", "Authorization": f"Bearer {tok}", }, ) return response.json() if "messages" not in st.session_state: st.session_state.messages = [] if len(st.session_state.messages) > 0: for msg in st.session_state.messages: with st.chat_message(msg["role"]): st.markdown(msg["content"]) if bio_input: st.session_state.messages.append({"role": "user", "content": bio_input}) with st.chat_message("user"): st.markdown(bio_input) if st.session_state.projectname in [None, ""]: st.markdown(":orange-badge[โ ๏ธ Set Projectname]") else: identified_target = Target_Identification(bio_input) st.warning(f"TARGET IDENTIFIED IS : {identified_target}") payload = { "uid": st.user.email, "pid": st.session_state.projectname, "target": identified_target or None, "high_level_bio_query": bio_input, } response = requests.post( "https://thexforce-combat-backend.hf.space/application_layer_agent", json=payload, headers={ "Content-Type": "application/json", "Authorization":f"Bearer {tok}", }, ) plan_response = response.json() with st.chat_message("assistant"): st.markdown(plan_response) # FIXED: Use animations while waiting for backend processing with st.spinner("๐งฌ Processing biological data..."): # Try multiple times with short delays between attempts fetch_ops_response = None max_attempts = 6 for attempt in range(max_attempts): fetch_ops_response = fetch_ops(force_refresh=True) # Check if we got data if fetch_ops_response and fetch_ops_response.get("exp"): break # Short delay before next attempt (non-blocking) if attempt < max_attempts - 1: # Don't sleep on last attempt import time time.sleep(0.5) # Much shorter sleep between retries # Show completion animation if fetch_ops_response and fetch_ops_response.get("exp"): st.success("โ Analysis complete!") else: st.info("โณ Operations are still being processed...") # FIXED: Better error handling with animations if fetch_ops_response: st.write("Debug - Full response:", fetch_ops_response) # Debug line - remove in production exp_data = fetch_ops_response.get("exp") if exp_data is not None and len(exp_data) > 0: # Animated success message st.balloons() # Celebratory animation st.success(f"๐ฌ Found {len(exp_data)} experimental operations!") for i, op in enumerate(exp_data): with st.chat_message("assistant"): # Add animated typing effect with progress progress_placeholder = st.empty() progress_placeholder.info(f"๐ Loading operation {i+1}/{len(exp_data)}...") operation_text = op.get("operation", "No operation text") output_text = op.get("output", "No output text") # Clear progress and show content progress_placeholder.empty() st.markdown(f"**๐งช Operation:** {operation_text}") st.markdown(f"**๐ Output:** {output_text}") # Small delay between operations for visual flow import time time.sleep(0.2) else: # Animated waiting message st.info("๐ No experimental data found yet. Operations may still be processing...") with st.container(): st.markdown("""