Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import time | |
| import re | |
| import requests | |
| import urllib.parse | |
| from PIL import Image | |
| from io import BytesIO | |
| from openai import OpenAI | |
| # ------------------ Authentication ------------------ | |
| VALID_USERS = { | |
| "andrew@lortechnologies.com": "Pass.123", | |
| "asherS@schlagergroup.com.au": "Pass.123", | |
| "daniel@schlagergroup.com.au": "Pass.123", | |
| "admin@schlagergroup.com.au": "Pass.123", | |
| } | |
| def login(): | |
| st.title("π Login Required") | |
| email = st.text_input("Email") | |
| password = st.text_input("Password", type="password") | |
| if st.button("Login"): | |
| if VALID_USERS.get(email) == password: | |
| st.session_state.authenticated = True | |
| st.rerun() | |
| else: | |
| st.error("β Incorrect email or password.") | |
| if "authenticated" not in st.session_state: | |
| st.session_state.authenticated = False | |
| if not st.session_state.authenticated: | |
| login() | |
| st.stop() | |
| # ------------------ App Configuration ------------------ | |
| st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed") | |
| st.title("𧬠AI Pathology Assistant") | |
| st.caption("AI-powered exploration of pathology, anatomy, and histology documents via OCR + GPT") | |
| # ------------------ Load OpenAI API Key ------------------ | |
| OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
| if not OPENAI_API_KEY: | |
| st.error("β Missing OPENAI_API_KEY environment variable.") | |
| st.stop() | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| # ------------------ Assistant Configuration ------------------ | |
| ASSISTANT_ID = "asst_jXDSjCG8LI4HEaFEcjFVq8KB" | |
| # ------------------ Session State ------------------ | |
| if "messages" not in st.session_state: | |
| st.session_state.messages = [] | |
| if "thread_id" not in st.session_state: | |
| st.session_state.thread_id = None | |
| if "image_urls" not in st.session_state: | |
| st.session_state.image_urls = [] | |
| if "pending_prompt" not in st.session_state: | |
| st.session_state.pending_prompt = None | |
| # ------------------ Sidebar ------------------ | |
| with st.sidebar: | |
| st.header("π§ͺ Pathology Tools") | |
| if st.button("π§Ή Clear Chat"): | |
| st.session_state.messages = [] | |
| st.session_state.thread_id = None | |
| st.session_state.image_urls = [] | |
| st.session_state.pending_prompt = None | |
| st.rerun() | |
| show_image = st.toggle("πΈ Show Slide Images", value=True) | |
| keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma") | |
| if st.button("π Search") and keyword: | |
| st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}" | |
| section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue") | |
| if section: | |
| st.session_state.pending_prompt = f"Summarize or list key points from section: {section}" | |
| actions = [ | |
| "Select an action...", | |
| "List histological features of inflammation", | |
| "Summarize features of carcinoma", | |
| "List muscle types and features", | |
| "Extract diagnostic markers", | |
| "Summarize embryology stages" | |
| ] | |
| action = st.selectbox("Common Pathology Queries", actions) | |
| if action != actions[0]: | |
| st.session_state.pending_prompt = action | |
| # ------------------ Main Chat UI ------------------ | |
| chat_col, image_col = st.columns([2, 1]) | |
| with chat_col: | |
| st.markdown("### π¬ Ask a Pathology-Specific Question") | |
| user_input = st.chat_input("Example: What are features of squamous cell carcinoma?") | |
| if user_input: | |
| st.session_state.messages.append({"role": "user", "content": user_input}) | |
| elif st.session_state.pending_prompt: | |
| st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt}) | |
| st.session_state.pending_prompt = None | |
| if st.session_state.messages and st.session_state.messages[-1]["role"] == "user": | |
| try: | |
| if st.session_state.thread_id is None: | |
| thread = client.beta.threads.create() | |
| st.session_state.thread_id = thread.id | |
| client.beta.threads.messages.create( | |
| thread_id=st.session_state.thread_id, | |
| role="user", | |
| content=st.session_state.messages[-1]["content"] | |
| ) | |
| run = client.beta.threads.runs.create( | |
| thread_id=st.session_state.thread_id, | |
| assistant_id=ASSISTANT_ID | |
| ) | |
| with st.spinner("π¬ Analyzing..."): | |
| while True: | |
| status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id) | |
| if status.status in ("completed", "failed", "cancelled"): | |
| break | |
| time.sleep(1) | |
| if status.status == "completed": | |
| messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id) | |
| for m in reversed(messages.data): | |
| if m.role == "assistant": | |
| reply = m.content[0].text.value | |
| st.session_state.messages.append({"role": "assistant", "content": reply}) | |
| # Extract raw GitHub image URLs | |
| image_matches = re.findall( | |
| r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^\s\n"]+\.png', | |
| reply | |
| ) | |
| st.session_state.image_urls = image_matches | |
| break | |
| else: | |
| st.error("β Assistant failed to respond.") | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"β Error: {e}") | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): | |
| st.markdown(msg["content"], unsafe_allow_html=True) | |
| # ------------------ Image Preview (Smart URL Encoding) ------------------ | |
| with image_col: | |
| if show_image and st.session_state.image_urls: | |
| st.markdown("### πΌοΈ Slide Previews") | |
| for raw_url in st.session_state.image_urls: | |
| try: | |
| if raw_url.startswith("https://raw.githubusercontent.com/"): | |
| base_url = "https://raw.githubusercontent.com/" | |
| path = raw_url[len(base_url):] | |
| encoded_path = urllib.parse.quote(path, safe="/") | |
| encoded_url = base_url + encoded_path | |
| else: | |
| encoded_url = raw_url | |
| r = requests.get(encoded_url, timeout=5) | |
| r.raise_for_status() | |
| img = Image.open(BytesIO(r.content)) | |
| st.image(img, caption=f"π· {encoded_url.split('/')[-1]}", use_container_width=True) | |
| except Exception: | |
| continue # Silently skip broken images | |