Spaces:
Sleeping
Sleeping
import streamlit as st | |
import google.generativeai as genai | |
from PIL import Image | |
import PyPDF2 | |
import tempfile | |
import os | |
from dotenv import load_dotenv | |
import time | |
from gtts import gTTS | |
import base64 | |
import requests # Added for Hugging Face API requests | |
from google.api_core import exceptions | |
load_dotenv() | |
# Configure the Gemini AI model for text analysis | |
gemini_api_key = os.getenv("GEMINI_API_KEY") | |
if not gemini_api_key: | |
st.error("Gemini API key not found. Please set the GEMINI_API_KEY environment variable.") | |
st.stop() | |
genai.configure(api_key=gemini_api_key) | |
gemini_model = genai.GenerativeModel('gemini-1.5-flash') | |
# Configure the Hugging Face model for image analysis | |
huggingface_api_key = os.getenv("HUGGINGFACE_API_KEY") | |
if not huggingface_api_key: | |
st.error("Hugging Face API key not found. Please set the HUGGINGFACE_API_KEY environment variable.") | |
st.stop() | |
HUGGINGFACE_API_URL = os.getenv("HUGGINGFACE_API_URL") | |
if not HUGGINGFACE_API_URL: | |
st.error("Hugging Face API URL not found. Please set the HUGGINGFACE_API_URL environment variable.") | |
st.stop() | |
MAX_RETRIES = 3 | |
RETRY_DELAY = 2 # seconds | |
# Dictionary for language support (including Urdu) | |
LANGUAGES = { | |
"English": "en", | |
"Spanish": "es", | |
"French": "fr", | |
"German": "de", | |
"Italian": "it", | |
"Portuguese": "pt", | |
"Urdu": "ur" | |
} | |
def analyze_text_report(content, lang): | |
prompt = "Analyze this medical report concisely. Provide key findings, diagnoses, and recommendations:" | |
# Adjust prompt language if not English | |
if lang != "en": | |
translations = { | |
"es": "Analiza este informe médico de manera concisa. Proporcione hallazgos clave, diagnósticos y recomendaciones:", | |
"fr": "Analysez ce rapport médical de manière concise. Fournissez les résultats clés, les diagnostics et les recommandations :", | |
"de": "Analysieren Sie diesen medizinischen Bericht kurz und prägnant. Geben Sie wichtige Ergebnisse, Diagnosen und Empfehlungen an:", | |
"it": "Analizza questo rapporto medico in modo conciso. Fornisci risultati chiave, diagnosi e raccomandazioni:", | |
"pt": "Analise este relatório médico de forma concisa. Forneça os principais resultados, diagnósticos e recomendações:", | |
"ur": "اس طبی رپورٹ کا مختصر تجزیہ کریں۔ اہم نتائج، تشخیصات، اور سفارشات فراہم کریں:" | |
} | |
prompt = translations.get(lang, prompt) | |
for attempt in range(MAX_RETRIES): | |
try: | |
response = gemini_model.generate_content(f"{prompt}\n\n{content}") | |
return response.text | |
except exceptions.GoogleAPIError as e: | |
if attempt < MAX_RETRIES - 1: | |
st.warning(f"An error occurred. Retrying in {RETRY_DELAY} seconds... (Attempt {attempt + 1}/{MAX_RETRIES})") | |
time.sleep(RETRY_DELAY) | |
else: | |
st.error(f"Failed to analyze the report after {MAX_RETRIES} attempts. Error: {str(e)}") | |
return fallback_analysis(content, "text") | |
def analyze_image_report(image_path, lang): | |
headers = { | |
"Authorization": f"Bearer {huggingface_api_key}", | |
"Content-Type": "application/octet-stream" | |
} | |
for attempt in range(MAX_RETRIES): | |
try: | |
with open(image_path, "rb") as img_file: | |
image_data = img_file.read() | |
response = requests.post(HUGGINGFACE_API_URL, headers=headers, data=image_data) | |
if response.status_code == 200: | |
result = response.json() | |
# Parse the response based on the model's output structure | |
analysis = "" | |
if isinstance(result, list): | |
for condition in result: | |
label = condition.get('label', 'Unknown') | |
score = condition.get('score', 0) | |
analysis += f"{label}: {score:.2f}\n" | |
elif isinstance(result, dict): | |
for key, value in result.items(): | |
analysis += f"{key}: {value:.2f}\n" | |
else: | |
st.warning("Unexpected response format from Hugging Face API.") | |
return fallback_analysis(None, "image") | |
return analysis | |
elif response.status_code == 503: | |
# Model is loading | |
st.warning("Model is loading. Waiting for 30 seconds before retrying...") | |
time.sleep(30) | |
continue | |
else: | |
st.warning(f"Hugging Face API returned status code {response.status_code}: {response.text}") | |
if attempt < MAX_RETRIES - 1: | |
st.warning(f"Retrying in {RETRY_DELAY} seconds... (Attempt {attempt + 1}/{MAX_RETRIES})") | |
time.sleep(RETRY_DELAY) | |
else: | |
st.error(f"Failed to analyze the image after {MAX_RETRIES} attempts.") | |
return fallback_analysis(None, "image") | |
except Exception as e: | |
if attempt < MAX_RETRIES - 1: | |
st.warning(f"An error occurred: {str(e)}. Retrying in {RETRY_DELAY} seconds... (Attempt {attempt + 1}/{MAX_RETRIES})") | |
time.sleep(RETRY_DELAY) | |
else: | |
st.error(f"Failed to analyze the image after {MAX_RETRIES} attempts. Error: {str(e)}") | |
return fallback_analysis(None, "image") | |
def fallback_analysis(content, content_type): | |
st.warning("Using fallback analysis method due to API issues.") | |
if content_type == "image": | |
return "Unable to analyze the image due to API issues. Please try again later or consult a medical professional for accurate interpretation." | |
else: # text | |
word_count = len(content.split()) if content else 0 | |
return f""" | |
**Fallback Analysis:** | |
1. **Document Type:** Text-based medical report | |
2. **Word Count:** Approximately {word_count} words | |
3. **Content:** The document appears to contain medical information, but detailed analysis is unavailable due to technical issues. | |
4. **Recommendation:** Please review the document manually or consult with a healthcare professional for accurate interpretation. | |
5. **Note:** This is a simplified analysis due to temporary unavailability of the AI service. For a comprehensive analysis, please try again later. | |
""" | |
def extract_text_from_pdf(pdf_file): | |
pdf_reader = PyPDF2.PdfReader(pdf_file) | |
text = "" | |
for page in pdf_reader.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text | |
return text | |
def generate_tts_audio(text, lang_code): | |
tts = gTTS(text=text, lang=lang_code) | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as tmp_file: | |
tts.save(tmp_file.name) | |
return tmp_file.name | |
def audio_player(audio_file_path): | |
with open(audio_file_path, "rb") as audio_file: | |
audio_bytes = audio_file.read() | |
b64_audio = base64.b64encode(audio_bytes).decode() | |
audio_html = f""" | |
<audio controls> | |
<source src="data:audio/mp3;base64,{b64_audio}" type="audio/mp3"> | |
Your browser does not support the audio element. | |
</audio> | |
""" | |
st.markdown(audio_html, unsafe_allow_html=True) | |
def main(): | |
st.title("AI-driven Medical Report Analyzer with Multilingual Audio Feedback") | |
st.write("Upload a medical report (image or PDF) for analysis") | |
# Language selection | |
language = st.selectbox("Select language for analysis and audio feedback:", list(LANGUAGES.keys())) | |
lang_code = LANGUAGES[language] | |
file_type = st.radio("Select file type:", ("Image", "PDF")) | |
if file_type == "Image": | |
uploaded_file = st.file_uploader("Choose a medical report image", type=["jpg", "jpeg", "png"]) | |
if uploaded_file is not None: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.png') as tmp_file: | |
tmp_file.write(uploaded_file.getvalue()) | |
tmp_file_path = tmp_file.name | |
image = Image.open(tmp_file_path) | |
st.image(image, caption="Uploaded Medical Report", use_column_width=True) | |
if st.button("Analyze Image Report"): | |
with st.spinner("Analyzing the medical report image..."): | |
analysis = analyze_image_report(tmp_file_path, lang_code) | |
st.subheader("Analysis Results:") | |
st.write(analysis) | |
# Generate and play audio for analysis | |
audio_path = generate_tts_audio(analysis, lang_code) | |
st.write("Listen to the analysis:") | |
audio_player(audio_path) | |
os.unlink(tmp_file_path) | |
else: # PDF | |
uploaded_file = st.file_uploader("Choose a medical report PDF", type=["pdf"]) | |
if uploaded_file is not None: | |
st.write("PDF uploaded successfully") | |
if st.button("Analyze PDF Report"): | |
with st.spinner("Analyzing the medical report PDF..."): | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(uploaded_file.getvalue()) | |
tmp_file_path = tmp_file.name | |
with open(tmp_file_path, 'rb') as pdf_file: | |
pdf_text = extract_text_from_pdf(pdf_file) | |
analysis = analyze_text_report(pdf_text, lang_code) | |
st.subheader("Analysis Results:") | |
st.write(analysis) | |
# Generate and play audio for analysis | |
audio_path = generate_tts_audio(analysis, lang_code) | |
st.write("Listen to the analysis:") | |
audio_player(audio_path) | |
os.unlink(tmp_file_path) | |
if __name__ == "__main__": | |
main() | |