Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch", "/root/.cache"]: | |
| shutil.rmtree(d, ignore_errors=True) | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" | |
| os.environ["TORCH_HOME"] = "/tmp/torch" | |
| os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" | |
| os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" | |
| import json | |
| import uuid | |
| import datetime | |
| import numpy as np | |
| import torch | |
| import cv2 | |
| import joblib | |
| import torch.nn as nn | |
| import torchvision.transforms as transforms | |
| import torchvision.models as models | |
| from io import BytesIO | |
| from PIL import Image as PILImage | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, FileResponse | |
| import tensorflow as tf | |
| from model_histo import BreastCancerClassifier | |
| from fastapi.staticfiles import StaticFiles | |
| import uvicorn | |
| try: | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Image as ReportLabImage | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.enums import TA_CENTER, TA_JUSTIFY | |
| from reportlab.lib.units import inch | |
| from reportlab.lib.colors import navy, black | |
| REPORTLAB_AVAILABLE = True | |
| except ImportError: | |
| REPORTLAB_AVAILABLE = False | |
| from ultralytics import YOLO | |
| from sklearn.preprocessing import MinMaxScaler | |
| from model import MWT as create_model | |
| from augmentations import Augmentations | |
| from huggingface_hub import InferenceClient | |
| # ===================================================== | |
| # SETUP TEMP DIRS AND ENV | |
| # ===================================================== | |
| for d in ["/tmp/huggingface", "/tmp/Ultralytics", "/tmp/matplotlib", "/tmp/torch"]: | |
| shutil.rmtree(d, ignore_errors=True) | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface" | |
| os.environ["TORCH_HOME"] = "/tmp/torch" | |
| os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" | |
| os.environ["YOLO_CONFIG_DIR"] = "/tmp/Ultralytics" | |
| # ===================================================== | |
| # HUGGING FACE CLIENT SETUP | |
| # ===================================================== | |
| HF_MODEL_ID = "mistralai/Mistral-7B-v0.1" | |
| hf_token = os.getenv("HF_TOKEN") | |
| client = None | |
| if hf_token: | |
| try: | |
| client = InferenceClient(model=HF_MODEL_ID, token=hf_token) | |
| print(f"✅ Hugging Face InferenceClient initialized for {HF_MODEL_ID}") | |
| except Exception as e: | |
| print("⚠️ Failed to initialize Hugging Face client:", e) | |
| else: | |
| print("⚠️ Warning: No HF_TOKEN found — summaries will be skipped.") | |
| def generate_ai_summary(abnormal_cells, normal_cells, avg_confidence): | |
| """Generate a brief medical interpretation using Mistral.""" | |
| if not client: | |
| return "⚠️ Hugging Face client not initialized — skipping summary." | |
| try: | |
| prompt = f"""Act as a cytopathology expert providing a brief diagnostic interpretation. | |
| Observed Cell Counts: | |
| - {abnormal_cells} Abnormal Cells | |
| - {normal_cells} Normal Cells | |
| - Detection Confidence: {avg_confidence:.1f}% | |
| Write a 2-3 sentence professional medical assessment focusing on: | |
| 1. Cell count analysis | |
| 2. Abnormality ratio ({abnormal_cells/(abnormal_cells + normal_cells)*100:.1f}%) | |
| 3. Clinical significance | |
| Use objective, scientific language suitable for a pathology report.""" | |
| # Use streaming to avoid StopIteration | |
| response = client.text_generation( | |
| prompt, | |
| max_new_tokens=200, | |
| temperature=0.7, | |
| stream=False, | |
| details=True, | |
| stop_sequences=["\n\n", "###"] | |
| ) | |
| # Handle different response formats | |
| if hasattr(response, 'generated_text'): | |
| return response.generated_text.strip() | |
| elif isinstance(response, dict): | |
| return response.get('generated_text', '').strip() | |
| elif isinstance(response, str): | |
| return response.strip() | |
| # Fallback summary if response format is unexpected | |
| ratio = abnormal_cells / (abnormal_cells + normal_cells) * 100 if (abnormal_cells + normal_cells) > 0 else 0 | |
| return f"Analysis shows {abnormal_cells} abnormal cells ({ratio:.1f}%) and {normal_cells} normal cells, with average detection confidence of {avg_confidence:.1f}%." | |
| except Exception as e: | |
| # Provide a structured fallback summary instead of error message | |
| total = abnormal_cells + normal_cells | |
| if total == 0: | |
| return "No cells were detected in the sample. Consider re-scanning or adjusting detection parameters." | |
| ratio = (abnormal_cells / total) * 100 | |
| severity = "high" if ratio > 70 else "moderate" if ratio > 30 else "low" | |
| return f"Quantitative analysis detected {abnormal_cells} abnormal cells ({ratio:.1f}%) among {total} total cells, indicating {severity} abnormality ratio. Average detection confidence: {avg_confidence:.1f}%." | |
| def generate_mwt_summary(predicted_label, confidences, avg_confidence): | |
| """Generate a short MWT-specific interpretation using the HF client when available.""" | |
| if not client: | |
| return "⚠️ Hugging Face client not initialized — skipping AI interpretation." | |
| try: | |
| prompt = f""" | |
| You are a concise cytopathology expert. Given an MWT classifier result, write a 1-2 sentence professional interpretation suitable for embedding in a diagnostic report. | |
| Result: | |
| - Predicted label: {predicted_label} | |
| - Confidence (average): {avg_confidence:.1f}% | |
| - Class probabilities: {json.dumps(confidences)} | |
| Provide guidance on the significance of the result and any suggested next steps in plain, objective language. | |
| """ | |
| response = client.text_generation( | |
| prompt, | |
| max_new_tokens=120, | |
| temperature=0.2, | |
| stream=False, | |
| details=True, | |
| stop_sequences=["\n\n", "###"] | |
| ) | |
| if hasattr(response, 'generated_text'): | |
| return response.generated_text.strip() | |
| elif isinstance(response, dict): | |
| return response.get('generated_text', '').strip() | |
| elif isinstance(response, str): | |
| return response.strip() | |
| return f"Result: {predicted_label} (avg confidence {avg_confidence:.1f}%)." | |
| except Exception as e: | |
| return f"Quantitative result: {predicted_label} with average confidence {avg_confidence:.1f}%." | |
| def generate_cin_summary(predicted_grade, confidences, avg_confidence): | |
| """Generate a short CIN-specific interpretation using the HF client when available.""" | |
| if not client: | |
| return "⚠️ Hugging Face client not initialized — skipping AI interpretation." | |
| try: | |
| prompt = f""" | |
| You are a concise gynecologic pathology expert. Given a CIN classifier result, write a 1-2 sentence professional interpretation suitable for a diagnostic report. | |
| Result: | |
| - Predicted grade: {predicted_grade} | |
| - Confidence (average): {avg_confidence:.1f}% | |
| - Class probabilities: {json.dumps(confidences)} | |
| Provide a brief statement about clinical significance and suggested next steps (e.g., further colposcopic evaluation) in objective, clinical language. | |
| """ | |
| response = client.text_generation( | |
| prompt, | |
| max_new_tokens=140, | |
| temperature=0.2, | |
| stream=False, | |
| details=True, | |
| stop_sequences=["\n\n", "###"] | |
| ) | |
| if hasattr(response, 'generated_text'): | |
| return response.generated_text.strip() | |
| elif isinstance(response, dict): | |
| return response.get('generated_text', '').strip() | |
| elif isinstance(response, str): | |
| return response.strip() | |
| return f"Result: {predicted_grade} (avg confidence {avg_confidence:.1f}%)." | |
| except Exception: | |
| return f"Quantitative result: {predicted_grade} with average confidence {avg_confidence:.1f}%." | |
| # ===================================================== | |
| # FASTAPI SETUP | |
| # ===================================================== | |
| app = FastAPI(title="Pathora Medical Diagnostic API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*", "http://localhost:5173", "http://127.0.0.1:5173"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| expose_headers=["*"] # Allow access to response headers | |
| ) | |
| # Use /tmp for outputs in Hugging Face Spaces (writable directory) | |
| OUTPUT_DIR = os.environ.get("OUTPUT_DIR", "/tmp/outputs") | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Create image outputs dir | |
| IMAGES_DIR = os.path.join(OUTPUT_DIR, "images") | |
| os.makedirs(IMAGES_DIR, exist_ok=True) | |
| app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") | |
| # Mount public sample images from frontend dist (Vite copies public/ to dist/ root) | |
| # Check both possible locations: frontend/dist (Docker) and ../frontend/dist (local dev) | |
| FRONTEND_DIST_CHECK = os.path.join(os.path.dirname(__file__), "frontend/dist") | |
| if not os.path.isdir(FRONTEND_DIST_CHECK): | |
| FRONTEND_DIST_CHECK = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) | |
| for sample_dir in ["cyto", "colpo", "histo"]: | |
| sample_path = os.path.join(FRONTEND_DIST_CHECK, sample_dir) | |
| if os.path.isdir(sample_path): | |
| app.mount(f"/{sample_dir}", StaticFiles(directory=sample_path), name=sample_dir) | |
| print(f"✅ Mounted /{sample_dir} from {sample_path}") | |
| else: | |
| print(f"⚠️ Sample directory not found: {sample_path}") | |
| # Mount other static assets (logos, banners) from dist root | |
| for static_file in ["banner.jpeg", "white_logo.png", "black_logo.png", "manalife_LOGO.jpg"]: | |
| static_path = os.path.join(FRONTEND_DIST_CHECK, static_file) | |
| if os.path.isfile(static_path): | |
| print(f"✅ Static file available: /{static_file}") | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # ===================================================== | |
| # MODEL LOADS | |
| # ===================================================== | |
| print("🔹 Loading YOLO model...") | |
| yolo_model = YOLO("best2.pt") | |
| print("🔹 Loading MWT model...") | |
| mwt_model = create_model(num_classes=2).to(device) | |
| mwt_model.load_state_dict(torch.load("MWTclass2.pth", map_location=device)) | |
| mwt_model.eval() | |
| mwt_class_names = ["Negative", "Positive"] | |
| print("🔹 Loading CIN model...") | |
| try: | |
| clf = joblib.load("logistic_regression_model.pkl") | |
| except Exception as e: | |
| print(f"⚠️ CIN classifier not available (logistic_regression_model.pkl missing or invalid): {e}") | |
| clf = None | |
| yolo_colposcopy = YOLO("yolo_colposcopy.pt") | |
| # ===================================================== | |
| # RESNET FEATURE EXTRACTORS FOR CIN | |
| # ===================================================== | |
| def build_resnet(model_name="resnet50"): | |
| if model_name == "resnet50": | |
| model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) | |
| elif model_name == "resnet101": | |
| model = models.resnet101(weights=models.ResNet101_Weights.DEFAULT) | |
| elif model_name == "resnet152": | |
| model = models.resnet152(weights=models.ResNet152_Weights.DEFAULT) | |
| model.eval().to(device) | |
| return ( | |
| nn.Sequential(model.conv1, model.bn1, model.relu, model.maxpool), | |
| model.layer1, model.layer2, model.layer3, model.layer4, | |
| ) | |
| gap = nn.AdaptiveAvgPool2d((1, 1)) | |
| gmp = nn.AdaptiveMaxPool2d((1, 1)) | |
| resnet50_blocks = build_resnet("resnet50") | |
| resnet101_blocks = build_resnet("resnet101") | |
| resnet152_blocks = build_resnet("resnet152") | |
| transform = transforms.Compose([ | |
| transforms.ToPILImage(), | |
| transforms.Resize((224, 224)), | |
| transforms.ToTensor(), | |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], | |
| std=[0.229, 0.224, 0.225]), | |
| ]) | |
| def preprocess_for_mwt(image_np): | |
| img = cv2.resize(image_np, (224, 224)) | |
| img = Augmentations.Normalization((0, 1))(img) | |
| img = np.array(img, np.float32) | |
| img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| img = img.transpose(2, 0, 1) | |
| img = np.expand_dims(img, axis=0) | |
| return torch.Tensor(img) | |
| def extract_cbf_features(blocks, img_t): | |
| block1, block2, block3, block4, block5 = blocks | |
| with torch.no_grad(): | |
| f1 = block1(img_t) | |
| f2 = block2(f1) | |
| f3 = block3(f2) | |
| f4 = block4(f3) | |
| f5 = block5(f4) | |
| p1 = gmp(f1).view(-1) | |
| p2 = gmp(f2).view(-1) | |
| p3 = gap(f3).view(-1) | |
| p4 = gap(f4).view(-1) | |
| p5 = gap(f5).view(-1) | |
| return torch.cat([p1, p2, p3, p4, p5], dim=0).cpu().numpy() | |
| # ===================================================== | |
| # Model 4: Histopathology Classifier (TensorFlow) | |
| # ===================================================== | |
| print("🔹 Attempting to load Breast Cancer Histopathology model...") | |
| try: | |
| classifier = BreastCancerClassifier(fine_tune=False) | |
| # Safely handle Hugging Face token auth | |
| hf_token = os.getenv("HF_TOKEN") | |
| if hf_token: | |
| if classifier.authenticate_huggingface(): | |
| print("✅ Hugging Face authentication successful.") | |
| else: | |
| print("⚠️ Warning: Hugging Face authentication failed, using local model only.") | |
| else: | |
| print("⚠️ HF_TOKEN not found in environment — skipping authentication.") | |
| # Load Path Foundation model | |
| if classifier.load_path_foundation(): | |
| print("✅ Loaded Path Foundation base model.") | |
| else: | |
| print("⚠️ Could not load Path Foundation base model, continuing with local weights only.") | |
| # Load trained histopathology model | |
| model_path = "histopathology_trained_model.keras" | |
| if os.path.exists(model_path): | |
| classifier.model = tf.keras.models.load_model(model_path) | |
| print(f"✅ Loaded local histopathology model: {model_path}") | |
| else: | |
| print(f"⚠️ Model file not found: {model_path}") | |
| except Exception as e: | |
| classifier = None | |
| print(f"❌ Error initializing histopathology model: {e}") | |
| def predict_histopathology(image): | |
| if classifier is None: | |
| return {"error": "Histopathology model not available."} | |
| try: | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| image = image.resize((224, 224)) | |
| img_array = np.expand_dims(np.array(image).astype("float32") / 255.0, axis=0) | |
| embeddings = classifier.extract_embeddings(img_array) | |
| prediction_proba = classifier.model.predict(embeddings, verbose=0)[0] | |
| predicted_class = int(np.argmax(prediction_proba)) | |
| class_names = ["Benign", "Malignant"] | |
| # Return confidence as dictionary with both class probabilities (like MWT/CIN) | |
| confidences = {class_names[i]: float(prediction_proba[i]) for i in range(len(class_names))} | |
| avg_confidence = float(np.max(prediction_proba)) * 100 | |
| return { | |
| "model_used": "Histopathology Classifier", | |
| "prediction": class_names[predicted_class], | |
| "confidence": confidences, | |
| "summary": { | |
| "avg_confidence": round(avg_confidence, 2), | |
| "ai_interpretation": f"Histopathological analysis indicates {class_names[predicted_class].lower()} tissue with {avg_confidence:.1f}% confidence.", | |
| }, | |
| } | |
| except Exception as e: | |
| return {"error": f"Histopathology prediction failed: {e}"} | |
| # ===================================================== | |
| # MAIN ENDPOINT | |
| # ===================================================== | |
| async def predict(model_name: str = Form(...), file: UploadFile = File(...)): | |
| print(f"Received prediction request - model: {model_name}, file: {file.filename}") | |
| # Validate model name | |
| if model_name not in ["yolo", "mwt", "cin", "histopathology"]: | |
| return JSONResponse( | |
| content={ | |
| "error": f"Invalid model_name: {model_name}. Must be one of: yolo, mwt, cin, histopathology" | |
| }, | |
| status_code=400 | |
| ) | |
| # Validate and read file | |
| if not file.filename: | |
| return JSONResponse( | |
| content={"error": "No file provided"}, | |
| status_code=400 | |
| ) | |
| contents = await file.read() | |
| if len(contents) == 0: | |
| return JSONResponse( | |
| content={"error": "Empty file provided"}, | |
| status_code=400 | |
| ) | |
| # Attempt to open and validate image | |
| try: | |
| image = PILImage.open(BytesIO(contents)).convert("RGB") | |
| image_np = np.array(image) | |
| if image_np.size == 0: | |
| raise ValueError("Empty image array") | |
| print(f"Successfully loaded image, shape: {image_np.shape}") | |
| except Exception as e: | |
| return JSONResponse( | |
| content={"error": f"Invalid image file: {str(e)}"}, | |
| status_code=400 | |
| ) | |
| if model_name == "yolo": | |
| results = yolo_model(image) | |
| detections_json = results[0].to_json() | |
| detections = json.loads(detections_json) | |
| abnormal_cells = sum(1 for d in detections if d["name"] == "abnormal") | |
| normal_cells = sum(1 for d in detections if d["name"] == "normal") | |
| avg_confidence = np.mean([d.get("confidence", 0) for d in detections]) * 100 if detections else 0 | |
| ai_summary = generate_ai_summary(abnormal_cells, normal_cells, avg_confidence) | |
| output_filename = f"detected_{uuid.uuid4().hex[:8]}.jpg" | |
| output_path = os.path.join(IMAGES_DIR, output_filename) | |
| results[0].save(filename=output_path) | |
| return { | |
| "model_used": "YOLO Detection", | |
| "detections": detections, | |
| "annotated_image_url": f"/outputs/images/{output_filename}", | |
| "summary": { | |
| "abnormal_cells": abnormal_cells, | |
| "normal_cells": normal_cells, | |
| "avg_confidence": round(float(avg_confidence), 2), | |
| "ai_interpretation": ai_summary, | |
| }, | |
| } | |
| elif model_name == "mwt": | |
| tensor = preprocess_for_mwt(image_np) | |
| with torch.no_grad(): | |
| output = mwt_model(tensor.to(device)).cpu() | |
| probs = torch.softmax(output, dim=1)[0] | |
| confidences = {mwt_class_names[i]: float(probs[i]) for i in range(2)} | |
| predicted_label = mwt_class_names[int(torch.argmax(probs).item())] | |
| # Average / primary confidence for display | |
| avg_confidence = float(torch.max(probs).item()) * 100 | |
| # Generate a brief AI interpretation using the Mistral client (if available) | |
| ai_interp = generate_mwt_summary(predicted_label, confidences, avg_confidence) | |
| return { | |
| "model_used": "MWT Classifier", | |
| "prediction": predicted_label, | |
| "confidence": confidences, | |
| "summary": { | |
| "avg_confidence": round(avg_confidence, 2), | |
| "ai_interpretation": ai_interp, | |
| }, | |
| } | |
| elif model_name == "cin": | |
| if clf is None: | |
| return JSONResponse( | |
| content={"error": "CIN classifier not available on server."}, | |
| status_code=503, | |
| ) | |
| nparr = np.frombuffer(contents, np.uint8) | |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| results = yolo_colposcopy.predict(source=img, conf=0.7, save=False, verbose=False) | |
| if len(results[0].boxes) == 0: | |
| return {"error": "No cervix detected"} | |
| x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].cpu().numpy()) | |
| crop = img[y1:y2, x1:x2] | |
| crop = cv2.resize(crop, (224, 224)) | |
| img_t = transform(crop).unsqueeze(0).to(device) | |
| f50 = extract_cbf_features(resnet50_blocks, img_t) | |
| f101 = extract_cbf_features(resnet101_blocks, img_t) | |
| f152 = extract_cbf_features(resnet152_blocks, img_t) | |
| features = np.concatenate([f50, f101, f152]).reshape(1, -1) | |
| X_scaled = MinMaxScaler().fit_transform(features) | |
| pred = clf.predict(X_scaled)[0] | |
| proba = clf.predict_proba(X_scaled)[0] | |
| # Get actual number of classes from model output | |
| classes = ["Low-grade", "High-grade"] # Binary CIN classification | |
| predicted_label = classes[pred] | |
| confidences = {classes[i]: float(proba[i]) for i in range(len(classes))} | |
| # Map to more detailed classification based on confidence | |
| if predicted_label == "High-grade" and confidences["High-grade"] > 0.8: | |
| detailed_class = "CIN3" | |
| elif predicted_label == "High-grade": | |
| detailed_class = "CIN2" | |
| else: | |
| detailed_class = "CIN1" | |
| # Average / primary confidence for display | |
| avg_confidence = float(np.max(proba)) * 100 | |
| # Generate a brief AI interpretation using the Mistral client (if available) | |
| ai_interp = generate_cin_summary(predicted_label, confidences, avg_confidence) | |
| return { | |
| "model_used": "CIN Classifier", | |
| "prediction": detailed_class, | |
| "grade": predicted_label, | |
| "confidence": confidences, | |
| "summary": { | |
| "avg_confidence": round(avg_confidence, 2), | |
| "ai_interpretation": ai_interp, | |
| }, | |
| } | |
| elif model_name == "histopathology": | |
| result = predict_histopathology(image) | |
| return result | |
| else: | |
| return JSONResponse(content={"error": "Invalid model name"}, status_code=400) | |
| # ===================================================== | |
| # ROUTES | |
| # ===================================================== | |
| def create_designed_pdf(pdf_path, report_data, analysis_summary_json): | |
| doc = SimpleDocTemplate(pdf_path, pagesize=letter, | |
| rightMargin=72, leftMargin=72, | |
| topMargin=72, bottomMargin=18) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| styles.add(ParagraphStyle(name='Title', fontSize=20, fontName='Helvetica-Bold', alignment=TA_CENTER, textColor=navy)) | |
| styles.add(ParagraphStyle(name='Section', fontSize=14, fontName='Helvetica-Bold', spaceBefore=10, spaceAfter=6)) | |
| styles.add(ParagraphStyle(name='NormalSmall', fontSize=10, leading=12)) | |
| styles.add(ParagraphStyle(name='Heading', fontSize=16, fontName='Helvetica-Bold', textColor=navy, spaceBefore=6, spaceAfter=4)) | |
| patient = report_data['patient'] | |
| analysis = report_data.get('analysis', {}) | |
| # Safely parse analysis_summary_json | |
| try: | |
| ai_summary = json.loads(analysis_summary_json) if analysis_summary_json else {} | |
| except (json.JSONDecodeError, TypeError): | |
| ai_summary = {} | |
| # Determine report type based on model used | |
| model_used = ai_summary.get('model_used', '') | |
| if 'YOLO' in model_used or 'yolo' in str(analysis.get('id', '')).lower(): | |
| report_type = "CYTOLOGY" | |
| report_title = "Cytology Report" | |
| elif 'CIN' in model_used or 'cin' in str(analysis.get('id', '')).lower() or 'colpo' in str(analysis.get('id', '')).lower(): | |
| report_type = "COLPOSCOPY" | |
| report_title = "Colposcopy Report" | |
| elif 'histo' in str(analysis.get('id', '')).lower() or 'histopathology' in model_used.lower(): | |
| report_type = "HISTOPATHOLOGY" | |
| report_title = "Histopathology Report" | |
| else: | |
| report_type = "CYTOLOGY" | |
| report_title = "Medical Analysis Report" | |
| # Header | |
| story.append(Paragraph("MANALIFE AI", styles['Title'])) | |
| story.append(Paragraph("Advanced Medical Analysis", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.3*inch)) | |
| story.append(Paragraph(f"MEDICAL ANALYSIS REPORT OF {report_type}", styles['Heading'])) | |
| story.append(Paragraph(report_title, styles['Section'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Report ID and Date | |
| story.append(Paragraph(f"<b>Report ID:</b> {report_data.get('report_id', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Generated:</b> {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Patient Information Section | |
| story.append(Paragraph("Patient Information", styles['Section'])) | |
| story.append(Paragraph(f"<b>Patient ID:</b> {patient.get('id', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Exam Date:</b> {patient.get('exam_date', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Physician:</b> {patient.get('physician', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Facility:</b> {patient.get('facility', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Sample Information Section | |
| story.append(Paragraph("Sample Information", styles['Section'])) | |
| story.append(Paragraph(f"<b>Specimen Type:</b> {patient.get('specimen_type', 'Cervical Cytology')}", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Clinical History:</b> {patient.get('clinical_history', 'N/A')}", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # AI Analysis Section | |
| story.append(Paragraph("AI-ASSISTED ANALYSIS", styles['Section'])) | |
| story.append(Paragraph("<b>System:</b> Manalife AI System — Automated Analysis", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Confidence Score:</b> {ai_summary.get('avg_confidence', 'N/A')}%", styles['NormalSmall'])) | |
| # Add metrics based on report type | |
| if report_type == "HISTOPATHOLOGY": | |
| # For histopathology, show Benign/Malignant confidence | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| if isinstance(confidence_dict, dict): | |
| benign_conf = confidence_dict.get('Benign', 0) * 100 | |
| malignant_conf = confidence_dict.get('Malignant', 0) * 100 | |
| story.append(Paragraph(f"<b>Benign Confidence:</b> {benign_conf:.2f}%", styles['NormalSmall'])) | |
| story.append(Paragraph(f"<b>Malignant Confidence:</b> {malignant_conf:.2f}%", styles['NormalSmall'])) | |
| elif report_type == "CYTOLOGY": | |
| # For cytology (YOLO), show abnormal/normal cells | |
| if 'abnormal_cells' in ai_summary: | |
| story.append(Paragraph(f"<b>Abnormal Cells:</b> {ai_summary.get('abnormal_cells', 'N/A')}", styles['NormalSmall'])) | |
| if 'normal_cells' in ai_summary: | |
| story.append(Paragraph(f"<b>Normal Cells:</b> {ai_summary.get('normal_cells', 'N/A')}", styles['NormalSmall'])) | |
| else: | |
| # For CIN/Colposcopy, show class confidences | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| if isinstance(confidence_dict, dict): | |
| for cls, val in confidence_dict.items(): | |
| conf_pct = val * 100 if isinstance(val, (int, float)) else 0 | |
| story.append(Paragraph(f"<b>{cls} Confidence:</b> {conf_pct:.2f}%", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.1*inch)) | |
| story.append(Paragraph("<b>AI Interpretation:</b>", styles['NormalSmall'])) | |
| story.append(Paragraph(ai_summary.get('ai_interpretation', 'Not available.'), styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Doctor's Notes | |
| story.append(Paragraph("Doctor's Notes", styles['Section'])) | |
| story.append(Paragraph(report_data.get('doctor_notes') or 'No additional notes provided.', styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.2*inch)) | |
| # Recommendations | |
| story.append(Paragraph("RECOMMENDATIONS", styles['Section'])) | |
| story.append(Paragraph("Continue routine screening as per standard guidelines. Follow up as directed by your physician.", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.3*inch)) | |
| # Signatures | |
| story.append(Paragraph("Signatures", styles['Section'])) | |
| story.append(Paragraph("Dr. Emily Roberts, MD (Cytopathologist)", styles['NormalSmall'])) | |
| story.append(Paragraph("Dr. James Wilson, MD (Pathologist)", styles['NormalSmall'])) | |
| story.append(Spacer(1, 0.1*inch)) | |
| story.append(Paragraph(f"Generated on: {datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p')}", styles['NormalSmall'])) | |
| doc.build(story) | |
| async def generate_report( | |
| patient_id: str = Form(...), | |
| exam_date: str = Form(...), | |
| metadata: str = Form(...), | |
| notes: str = Form(None), | |
| analysis_id: str = Form(None), | |
| analysis_summary: str = Form(None), | |
| ): | |
| """Generate a structured medical report from analysis results and metadata.""" | |
| try: | |
| # Create reports directory if it doesn't exist | |
| reports_dir = os.path.join(OUTPUT_DIR, "reports") | |
| os.makedirs(reports_dir, exist_ok=True) | |
| # Generate unique report ID | |
| report_id = f"{patient_id}_{uuid.uuid4().hex[:8]}" | |
| report_dir = os.path.join(reports_dir, report_id) | |
| os.makedirs(report_dir, exist_ok=True) | |
| # Parse metadata | |
| metadata_dict = json.loads(metadata) | |
| # Get analysis results - assuming stored in memory or retrievable | |
| # TODO: Implement analysis results storage/retrieval | |
| # Construct report data | |
| report_data = { | |
| "report_id": report_id, | |
| "generated_at": datetime.datetime.now().isoformat(), | |
| "patient": { | |
| "id": patient_id, | |
| "exam_date": exam_date, | |
| **metadata_dict | |
| }, | |
| "analysis": { | |
| "id": analysis_id, | |
| # If the analysis_id is actually an annotated image URL, store it for report embedding | |
| "annotated_image_url": analysis_id, | |
| # TODO: Add actual analysis results | |
| }, | |
| "doctor_notes": notes | |
| } | |
| # Save report data | |
| report_json = os.path.join(report_dir, "report.json") | |
| with open(report_json, "w", encoding="utf-8") as f: | |
| json.dump(report_data, f, indent=2, ensure_ascii=False) | |
| # Attempt to create a PDF version if reportlab is available | |
| pdf_url = None | |
| if REPORTLAB_AVAILABLE: | |
| try: | |
| pdf_path = os.path.join(report_dir, "report.pdf") | |
| create_designed_pdf(pdf_path, report_data, analysis_summary) | |
| pdf_url = f"/outputs/reports/{report_id}/report.pdf" | |
| except Exception as e: | |
| print(f"Error creating designed PDF: {e}") | |
| pdf_url = None | |
| # Parse analysis_summary to get AI results | |
| try: | |
| ai_summary = json.loads(analysis_summary) if analysis_summary else {} | |
| except (json.JSONDecodeError, TypeError): | |
| ai_summary = {} | |
| # Determine report type based on analysis summary or model used | |
| model_used = ai_summary.get('model_used', '') | |
| if 'YOLO' in model_used or 'yolo' in str(analysis_id).lower(): | |
| report_type = "Cytology" | |
| report_title = "Cytology Report" | |
| elif 'CIN' in model_used or 'cin' in str(analysis_id).lower() or 'colpo' in str(analysis_id).lower(): | |
| report_type = "Colposcopy" | |
| report_title = "Colposcopy Report" | |
| elif 'histo' in str(analysis_id).lower() or 'histopathology' in model_used.lower(): | |
| report_type = "Histopathology" | |
| report_title = "Histopathology Report" | |
| else: | |
| # Default fallback | |
| report_type = "Cytology" | |
| report_title = "Medical Analysis Report" | |
| # Build analysis metrics HTML based on report type | |
| if report_type == "Histopathology": | |
| # For histopathology, show Benign/Malignant confidence from the confidence dict | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| benign_conf = confidence_dict.get('Benign', 0) * 100 if isinstance(confidence_dict, dict) else 0 | |
| malignant_conf = confidence_dict.get('Malignant', 0) * 100 if isinstance(confidence_dict, dict) else 0 | |
| analysis_metrics_html = f""" | |
| <tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr> | |
| <tr><th>Confidence Score</th><td>{ai_summary.get('avg_confidence', 'N/A')}%</td></tr> | |
| <tr><th>Benign Confidence</th><td>{benign_conf:.2f}%</td></tr> | |
| <tr><th>Malignant Confidence</th><td>{malignant_conf:.2f}%</td></tr> | |
| """ | |
| elif report_type == "Cytology": | |
| # For cytology (YOLO), show abnormal/normal cells | |
| analysis_metrics_html = f""" | |
| <tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr> | |
| <tr><th>Confidence Score</th><td>{ai_summary.get('avg_confidence', 'N/A')}%</td></tr> | |
| <tr><th>Abnormal Cells</th><td>{ai_summary.get('abnormal_cells', 'N/A')}</td></tr> | |
| <tr><th>Normal Cells</th><td>{ai_summary.get('normal_cells', 'N/A')}</td></tr> | |
| """ | |
| else: | |
| # For CIN/Colposcopy or other models, show generic confidence | |
| confidence_dict = ai_summary.get('confidence', {}) | |
| confidence_rows = "" | |
| if isinstance(confidence_dict, dict): | |
| for cls, val in confidence_dict.items(): | |
| conf_pct = val * 100 if isinstance(val, (int, float)) else 0 | |
| confidence_rows += f"<tr><th>{cls} Confidence</th><td>{conf_pct:.2f}%</td></tr>\n " | |
| analysis_metrics_html = f""" | |
| <tr><th>System</th><td>Manalife AI System — Automated Analysis</td></tr> | |
| <tr><th>Confidence Score</th><td>{ai_summary.get('avg_confidence', 'N/A')}%</td></tr> | |
| {confidence_rows} | |
| """ | |
| # Build final HTML including download links and embedded annotated image | |
| report_html = os.path.join(report_dir, "report.html") | |
| json_url = f"/outputs/reports/{report_id}/report.json" | |
| html_url = f"/outputs/reports/{report_id}/report.html" | |
| annotated_img = report_data.get("analysis", {}).get("annotated_image_url") or "" | |
| # Get base URL for the annotated image (if it's a relative path) | |
| annotated_img_full = f"http://localhost:8000{annotated_img}" if annotated_img and annotated_img.startswith('/') else annotated_img | |
| download_pdf_btn = f'<a href="{pdf_url}" download style="text-decoration:none"><button class="btn-secondary">Download PDF</button></a>' if pdf_url else '' | |
| # Format generated time | |
| generated_time = datetime.datetime.now().strftime('%b %d, %Y, %I:%M %p') | |
| html_content = f"""<!doctype html> | |
| <html lang="en"> | |
| <head> | |
| <meta charset="utf-8" /> | |
| <meta name="viewport" content="width=device-width,initial-scale=1" /> | |
| <title>Medical Analysis Report — Manalife AI</title> | |
| <style> | |
| :root{{--bg:#f8fafc;--card:#ffffff;--muted:#6b7280;--accent:#0f172a}} | |
| body{{font-family:Inter,ui-sans-serif,system-ui,-apple-system,"Segoe UI",Roboto,"Helvetica Neue",Arial;margin:0;background:var(--bg);color:var(--accent);line-height:1.45}} | |
| .container{{max-width:900px;margin:36px auto;padding:20px}} | |
| header{{display:flex;align-items:center;gap:16px}} | |
| .brand{{display:flex;flex-direction:column}} | |
| h1{{margin:0;font-size:20px}} | |
| .sub{{color:var(--muted);font-size:13px}} | |
| .card{{background:var(--card);box-shadow:0 6px 18px rgba(15,23,42,0.06);border-radius:12px;padding:20px;margin-top:18px}} | |
| .grid{{display:grid;grid-template-columns:1fr 1fr;gap:12px}} | |
| .section-title{{font-weight:600;margin-top:8px}} | |
| table{{width:100%;border-collapse:collapse;margin-top:8px}} | |
| td,th{{padding:8px;border-bottom:1px dashed #e6e9ef;text-align:left;font-size:14px}} | |
| .full{{grid-column:1/-1}} | |
| .muted{{color:var(--muted);font-size:13px}} | |
| .footer{{margin-top:20px;font-size:13px;color:var(--muted)}} | |
| .pill{{background:#eef2ff;color:#1e3a8a;padding:6px 10px;border-radius:999px;font-weight:600;font-size:13px}} | |
| @media (max-width:700px){{.grid{{grid-template-columns:1fr}}}} | |
| .signatures{{display:flex;gap:20px;flex-wrap:wrap;margin-top:12px}} | |
| .sig{{background:#fbfbfd;border:1px solid #f0f1f5;padding:10px;border-radius:8px;min-width:180px}} | |
| .annotated-image{{max-width:100%;height:auto;border-radius:8px;margin-top:12px;border:1px solid #e6e9ef}} | |
| .btn-primary{{padding:10px 14px;border-radius:8px;border:1px solid #2563eb;background:#2563eb;color:white;font-weight:700;cursor:pointer}} | |
| .btn-secondary{{padding:10px 14px;border-radius:8px;border:1px solid #e6eefc;background:#eef2ff;font-weight:700;cursor:pointer}} | |
| .actions-bar{{margin-top:12px;display:flex;gap:8px;flex-wrap:wrap}} | |
| </style> | |
| </head> | |
| <body> | |
| <div class="container"> | |
| <header> | |
| <div> | |
| <!-- Use the static logo from frontend public/ (copied to dist by Vite) --> | |
| <img src="/manalife_LOGO.jpg" alt="Manalife Logo" width="64" height="64"> | |
| </div> | |
| <div class="brand"> | |
| <h1>MANALIFE AI — Medical Analysis</h1> | |
| <div class="sub">Advanced cytological colposcopy and histopathology reporting</div> | |
| <div class="muted">contact@manalife.ai • +1 (555) 123-4567</div> | |
| </div> | |
| </header> | |
| <div class="card"> | |
| <div style="display:flex;justify-content:space-between;align-items:center;gap:12px;flex-wrap:wrap"> | |
| <div> | |
| <div class="muted">MEDICAL ANALYSIS REPORT OF {report_type.upper()}</div> | |
| <h2 style="margin:6px 0 0 0">{report_title}</h2> | |
| </div> | |
| <div style="text-align:right"> | |
| <div class="pill">Report ID: {report_id}</div> | |
| <div class="muted" style="margin-top:6px">Generated: {generated_time}</div> | |
| </div> | |
| </div> | |
| <hr style="border:none;border-top:1px solid #eef2f6;margin:16px 0"> | |
| <div class="grid"> | |
| <div> | |
| <div class="section-title">Patient Information</div> | |
| <table> | |
| <tr><th>Patient ID</th><td>{patient_id}</td></tr> | |
| <tr><th>Exam Date</th><td>{exam_date}</td></tr> | |
| <tr><th>Physician</th><td>{metadata_dict.get('physician', 'N/A')}</td></tr> | |
| <tr><th>Facility</th><td>{metadata_dict.get('facility', 'N/A')}</td></tr> | |
| </table> | |
| </div> | |
| <div> | |
| <div class="section-title">Sample Information</div> | |
| <table> | |
| <tr><th>Specimen Type</th><td>{metadata_dict.get('specimen_type', 'N/A')}</td></tr> | |
| <tr><th>Clinical History</th><td>{metadata_dict.get('clinical_history', 'N/A')}</td></tr> | |
| <tr><th>Collected</th><td>{exam_date}</td></tr> | |
| <tr><th>Reported</th><td>{generated_time}</td></tr> | |
| </table> | |
| </div> | |
| <div class="full"> | |
| <div class="section-title">AI-Assisted Analysis</div> | |
| <table> | |
| {analysis_metrics_html} | |
| </table> | |
| <div style="margin-top:12px;padding:12px;background:#f8fafc;border-radius:8px;border-left:4px solid #2563eb"> | |
| <div style="font-weight:600;margin-bottom:6px">AI Interpretation:</div> | |
| <div class="muted">{ai_summary.get('ai_interpretation', 'No AI interpretation available.')}</div> | |
| </div> | |
| </div> | |
| {'<div class="full"><div class="section-title">Annotated Analysis Image</div><img src="' + annotated_img_full + '" class="annotated-image" alt="Annotated Analysis Result" /></div>' if annotated_img else ''} | |
| <div class="full"> | |
| <div class="section-title">Doctor\'s Notes</div> | |
| <p class="muted">{notes or 'No additional notes provided.'}</p> | |
| </div> | |
| <div class="full"> | |
| <div class="section-title">Recommendations</div> | |
| <p class="muted">Continue routine screening as per standard guidelines. Follow up as directed by your physician.</p> | |
| </div> | |
| <div class="full"> | |
| <div class="section-title">Signatures</div> | |
| <div class="signatures"> | |
| <div class="sig"> | |
| <div style="font-weight:700">Dr. Emily Roberts</div> | |
| <div class="muted">MD, pathologist</div> | |
| </div> | |
| <div class="sig"> | |
| <div style="font-weight:700">Dr. James Wilson</div> | |
| <div class="muted">MD, pathologist</div> | |
| </div> | |
| </div> | |
| </div> | |
| </div> | |
| <div class="footer"> | |
| <div>AI System: Manalife AI — Automated Analysis</div> | |
| <div style="margin-top:6px">Report generated: {report_data['generated_at']}</div> | |
| </div> | |
| </div> | |
| <div class="actions-bar"> | |
| {download_pdf_btn} | |
| <button class="btn-secondary" onclick="window.print()">Print Report</button> | |
| </div> | |
| </div> | |
| </body> | |
| </html>""" | |
| with open(report_html, "w", encoding="utf-8") as f: | |
| f.write(html_content) | |
| return { | |
| "report_id": report_id, | |
| "json_url": json_url, | |
| "html_url": html_url, | |
| "pdf_url": pdf_url, | |
| } | |
| except Exception as e: | |
| return JSONResponse( | |
| content={"error": f"Failed to generate report: {str(e)}"}, | |
| status_code=500 | |
| ) | |
| async def get_report(report_id: str): | |
| """Fetch a generated report by ID.""" | |
| report_dir = os.path.join(OUTPUT_DIR, "reports", report_id) | |
| report_json = os.path.join(report_dir, "report.json") | |
| if not os.path.exists(report_json): | |
| return JSONResponse( | |
| content={"error": "Report not found"}, | |
| status_code=404 | |
| ) | |
| with open(report_json, "r") as f: | |
| report_data = json.load(f) | |
| return report_data | |
| async def list_reports(patient_id: str = None): | |
| """List all generated reports, optionally filtered by patient ID.""" | |
| reports_dir = os.path.join(OUTPUT_DIR, "reports") | |
| if not os.path.exists(reports_dir): | |
| return {"reports": []} | |
| reports = [] | |
| for report_id in os.listdir(reports_dir): | |
| report_json = os.path.join(reports_dir, report_id, "report.json") | |
| if os.path.exists(report_json): | |
| with open(report_json, "r") as f: | |
| report_data = json.load(f) | |
| if not patient_id or report_data["patient"]["id"] == patient_id: | |
| reports.append({ | |
| "report_id": report_id, | |
| "patient_id": report_data["patient"]["id"], | |
| "exam_date": report_data["patient"]["exam_date"], | |
| "generated_at": report_data["generated_at"] | |
| }) | |
| return {"reports": sorted(reports, key=lambda r: r["generated_at"], reverse=True)} | |
| def get_models(): | |
| return {"available_models": ["yolo", "mwt", "cin", "histopathology"]} | |
| def health(): | |
| return {"message": "Pathora Medical Diagnostic API is running!"} | |
| # FRONTEND | |
| # ===================================================== | |
| # Serve frontend only if it has been built; avoid startup failure when dist/ is missing. | |
| FRONTEND_DIST = os.path.abspath(os.path.join(os.path.dirname(__file__), "../frontend/dist")) | |
| # Check if frontend/dist exists in /app (Docker), otherwise check relative to script location | |
| if not os.path.isdir(FRONTEND_DIST): | |
| # Fallback for Docker: frontend is copied to ./frontend/dist during build | |
| FRONTEND_DIST = os.path.join(os.path.dirname(__file__), "frontend/dist") | |
| ASSETS_DIR = os.path.join(FRONTEND_DIST, "assets") | |
| if os.path.isdir(ASSETS_DIR): | |
| app.mount("/assets", StaticFiles(directory=ASSETS_DIR), name="assets") | |
| else: | |
| print("ℹ️ Frontend assets directory not found — skipping /assets mount.") | |
| async def serve_frontend(): | |
| index_path = os.path.join(FRONTEND_DIST, "index.html") | |
| if os.path.isfile(index_path): | |
| return FileResponse(index_path) | |
| return JSONResponse({"message": "Backend is running. Frontend build not found."}) | |
| async def serve_static_files(file_path: str): | |
| """Serve static files from frontend dist (images, logos, etc.)""" | |
| # Skip API routes | |
| if file_path.startswith(("predict", "reports", "models", "health", "outputs", "assets", "cyto", "colpo", "histo")): | |
| return JSONResponse({"error": "Not found"}, status_code=404) | |
| # Try to serve file from dist root | |
| static_file = os.path.join(FRONTEND_DIST, file_path) | |
| if os.path.isfile(static_file): | |
| return FileResponse(static_file) | |
| # Fallback to index.html for client-side routing | |
| index_path = os.path.join(FRONTEND_DIST, "index.html") | |
| if os.path.isfile(index_path): | |
| return FileResponse(index_path) | |
| return JSONResponse({"error": "Not found"}, status_code=404) | |
| if __name__ == "__main__": | |
| # Use PORT provided by the environment (Hugging Face Spaces sets PORT=7860) | |
| port = int(os.environ.get("PORT", 7860)) | |
| uvicorn.run(app, host="0.0.0.0", port=port) | |