Spaces:
Sleeping
Sleeping
| """ | |
| Google Drive Batch Processor for TB-Guard-XAI | |
| Automatically processes chest X-rays uploaded to Google Drive | |
| Uses live Hugging Face Space endpoint for analysis | |
| """ | |
| import os | |
| import io | |
| import time | |
| import requests | |
| from pathlib import Path | |
| from datetime import datetime | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from google.auth.transport.requests import Request | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload, MediaIoBaseDownload | |
| import pickle | |
| from fpdf import FPDF | |
| # Hugging Face Space endpoint | |
| HF_SPACE_URL = "https://mistral-hackaton-2026-tb-guard-xai.hf.space" # Update with your actual URL | |
| API_ENDPOINT = f"{HF_SPACE_URL}/analyze" | |
| # Google Drive API scopes | |
| SCOPES = ['https://www.googleapis.com/auth/drive'] | |
| # Folder names in Google Drive | |
| INBOX_FOLDER = "TB_XRay_Inbox" | |
| REPORTS_FOLDER = "TB_Reports" | |
| PROCESSED_FOLDER = "TB_Processed" | |
| class GoogleDriveBatchProcessor: | |
| """Batch processor for Google Drive integration using HF Space API""" | |
| def __init__(self, hf_space_url=HF_SPACE_URL): | |
| self.service = self.authenticate() | |
| self.api_endpoint = f"{hf_space_url}/analyze" | |
| self.processed_files = set() | |
| # Test API connection | |
| print(f"π Testing connection to Hugging Face Space...") | |
| print(f" URL: {hf_space_url}") | |
| try: | |
| response = requests.get(f"{hf_space_url}/status", timeout=10) | |
| if response.status_code == 200: | |
| print(f" β API is online and ready!") | |
| else: | |
| print(f" β οΈ API returned status {response.status_code}") | |
| except Exception as e: | |
| print(f" β οΈ Could not connect to API: {e}") | |
| print(f" π‘ Make sure your Hugging Face Space is running") | |
| # Create folders if they don't exist | |
| self.inbox_id = self.get_or_create_folder(INBOX_FOLDER) | |
| self.reports_id = self.get_or_create_folder(REPORTS_FOLDER) | |
| self.processed_id = self.get_or_create_folder(PROCESSED_FOLDER) | |
| print(f"\nβ Google Drive folders ready:") | |
| print(f" π₯ Inbox: {INBOX_FOLDER}") | |
| print(f" π Reports: {REPORTS_FOLDER}") | |
| print(f" β Processed: {PROCESSED_FOLDER}") | |
| def authenticate(self): | |
| """Authenticate with Google Drive API""" | |
| creds = None | |
| # Token file stores user's access and refresh tokens | |
| if os.path.exists('token.pickle'): | |
| with open('token.pickle', 'rb') as token: | |
| creds = pickle.load(token) | |
| # If no valid credentials, let user log in | |
| if not creds or not creds.valid: | |
| if creds and creds.expired and creds.refresh_token: | |
| creds.refresh(Request()) | |
| else: | |
| if not os.path.exists('credentials.json'): | |
| print("β ERROR: credentials.json not found!") | |
| print("\nπ Setup Instructions:") | |
| print("1. Go to https://console.cloud.google.com/") | |
| print("2. Create a new project or select existing") | |
| print("3. Enable Google Drive API") | |
| print("4. Create OAuth 2.0 credentials (Desktop app)") | |
| print("5. Download credentials.json to this folder") | |
| print("6. Run this script again") | |
| raise FileNotFoundError("credentials.json not found") | |
| flow = InstalledAppFlow.from_client_secrets_file( | |
| 'credentials.json', SCOPES) | |
| creds = flow.run_local_server(port=0) | |
| # Save credentials for next run | |
| with open('token.pickle', 'wb') as token: | |
| pickle.dump(creds, token) | |
| return build('drive', 'v3', credentials=creds) | |
| def get_or_create_folder(self, folder_name): | |
| """Get folder ID or create if doesn't exist""" | |
| # Search for folder | |
| query = f"name='{folder_name}' and mimeType='application/vnd.google-apps.folder' and trashed=false" | |
| results = self.service.files().list(q=query, fields="files(id, name)").execute() | |
| folders = results.get('files', []) | |
| if folders: | |
| return folders[0]['id'] | |
| # Create folder | |
| file_metadata = { | |
| 'name': folder_name, | |
| 'mimeType': 'application/vnd.google-apps.folder' | |
| } | |
| folder = self.service.files().create(body=file_metadata, fields='id').execute() | |
| print(f"π Created folder: {folder_name}") | |
| return folder.get('id') | |
| def list_inbox_files(self): | |
| """List all image files in inbox folder""" | |
| query = f"'{self.inbox_id}' in parents and trashed=false and (mimeType='image/png' or mimeType='image/jpeg')" | |
| results = self.service.files().list( | |
| q=query, | |
| fields="files(id, name, createdTime)" | |
| ).execute() | |
| return results.get('files', []) | |
| def download_file(self, file_id, file_name): | |
| """Download file from Google Drive""" | |
| request = self.service.files().get_media(fileId=file_id) | |
| temp_path = Path("temp_gdrive") / file_name | |
| temp_path.parent.mkdir(exist_ok=True) | |
| fh = io.FileIO(str(temp_path), 'wb') | |
| downloader = MediaIoBaseDownload(fh, request) | |
| done = False | |
| while not done: | |
| status, done = downloader.next_chunk() | |
| fh.close() | |
| return temp_path | |
| def upload_file(self, file_path, folder_id, file_name=None): | |
| """Upload file to Google Drive""" | |
| if file_name is None: | |
| file_name = Path(file_path).name | |
| file_metadata = { | |
| 'name': file_name, | |
| 'parents': [folder_id] | |
| } | |
| media = MediaFileUpload(str(file_path), resumable=True) | |
| file = self.service.files().create( | |
| body=file_metadata, | |
| media_body=media, | |
| fields='id' | |
| ).execute() | |
| return file.get('id') | |
| def move_file(self, file_id, new_folder_id): | |
| """Move file to different folder""" | |
| # Get current parents | |
| file = self.service.files().get(fileId=file_id, fields='parents').execute() | |
| previous_parents = ",".join(file.get('parents')) | |
| # Move file | |
| self.service.files().update( | |
| fileId=file_id, | |
| addParents=new_folder_id, | |
| removeParents=previous_parents, | |
| fields='id, parents' | |
| ).execute() | |
| def generate_pdf_report(self, file_name, analysis_result, output_path): | |
| """Generate PDF report from analysis results""" | |
| pdf = FPDF() | |
| pdf.add_page() | |
| # Title | |
| pdf.set_font('Arial', 'B', 16) | |
| pdf.cell(0, 10, 'TB-Guard-XAI Clinical Report', 0, 1, 'C') | |
| pdf.ln(5) | |
| # Patient info | |
| pdf.set_font('Arial', '', 10) | |
| pdf.cell(0, 6, f'X-Ray File: {file_name}', 0, 1) | |
| pdf.cell(0, 6, f'Analysis Date: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}', 0, 1) | |
| pdf.cell(0, 6, f'System: TB-Guard-XAI v2.0 (Offline Mode: {analysis_result.get("mode", "unknown")})', 0, 1) | |
| pdf.ln(5) | |
| # Results | |
| pdf.set_font('Arial', 'B', 12) | |
| pdf.cell(0, 8, 'Analysis Results:', 0, 1) | |
| pdf.set_font('Arial', '', 10) | |
| pdf.cell(0, 6, f'Prediction: {analysis_result["prediction"]}', 0, 1) | |
| pdf.cell(0, 6, f'TB Probability: {analysis_result["probability"]*100:.1f}%', 0, 1) | |
| pdf.cell(0, 6, f'Uncertainty: {analysis_result["uncertainty"]} (std: {analysis_result["uncertainty_std"]:.4f})', 0, 1) | |
| pdf.cell(0, 6, f'Attention Region: {analysis_result.get("gradcam_region", "N/A")}', 0, 1) | |
| pdf.ln(5) | |
| # Clinical synthesis | |
| pdf.set_font('Arial', 'B', 12) | |
| pdf.cell(0, 8, 'Clinical Synthesis:', 0, 1) | |
| pdf.set_font('Arial', '', 9) | |
| synthesis = analysis_result.get("explanation", "No synthesis available") | |
| # Clean markdown and format for PDF | |
| synthesis = synthesis.replace('#', '').replace('*', '').replace('`', '') | |
| # Split into lines and add to PDF | |
| for line in synthesis.split('\n'): | |
| line = line.strip() | |
| if line: | |
| pdf.multi_cell(0, 5, line) | |
| pdf.ln(5) | |
| # Disclaimer | |
| pdf.set_font('Arial', 'I', 8) | |
| pdf.multi_cell(0, 4, 'DISCLAIMER: This is a screening tool, not a diagnostic tool. All findings must be confirmed by qualified healthcare professionals and appropriate diagnostic tests.') | |
| # Save PDF | |
| pdf.output(str(output_path)) | |
| def analyze_xray_via_api(self, image_path): | |
| """Analyze X-ray using Hugging Face Space API""" | |
| try: | |
| # Prepare file for upload | |
| with open(image_path, 'rb') as f: | |
| files = {'file': (Path(image_path).name, f, 'image/png')} | |
| data = { | |
| 'symptoms': '', # No symptoms for batch processing | |
| 'age_group': 'Adult (18-64)', # Default | |
| 'threshold': 0.5 | |
| } | |
| # Call API | |
| response = requests.post( | |
| self.api_endpoint, | |
| files=files, | |
| data=data, | |
| timeout=60 # 60 second timeout | |
| ) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| print(f" β οΈ API error: {response.status_code}") | |
| print(f" Response: {response.text[:200]}") | |
| return None | |
| except requests.exceptions.Timeout: | |
| print(f" β οΈ API timeout (>60s)") | |
| return None | |
| except Exception as e: | |
| print(f" β οΈ API call failed: {e}") | |
| return None | |
| def process_file(self, file_info): | |
| """Process a single X-ray file using HF Space API""" | |
| file_id = file_info['id'] | |
| file_name = file_info['name'] | |
| print(f"\nπ Processing: {file_name}") | |
| try: | |
| # Download file | |
| print(" π₯ Downloading from Google Drive...") | |
| local_path = self.download_file(file_id, file_name) | |
| # Analyze via API | |
| print(" π§ Sending to Hugging Face Space for analysis...") | |
| result = self.analyze_xray_via_api(local_path) | |
| if result is None: | |
| print(f" β Analysis failed for {file_name}") | |
| local_path.unlink() | |
| return False | |
| # Check for errors | |
| if 'error' in result: | |
| print(f" β API error: {result['error']}") | |
| local_path.unlink() | |
| return False | |
| # Show results | |
| mode = result.get('mode', 'unknown') | |
| prob = result.get('probability', 0) | |
| uncertainty = result.get('uncertainty', 'Unknown') | |
| print(f" π Results: {result.get('prediction', 'Unknown')}") | |
| print(f" β’ Probability: {prob*100:.1f}%") | |
| print(f" β’ Uncertainty: {uncertainty}") | |
| print(f" β’ Mode: {mode.upper()}") | |
| # Generate PDF report | |
| print(" π Generating PDF report...") | |
| report_name = Path(file_name).stem + "_report.pdf" | |
| report_path = Path("temp_gdrive") / report_name | |
| self.generate_pdf_report(file_name, result, report_path) | |
| # Upload report | |
| print(" π€ Uploading report to Google Drive...") | |
| self.upload_file(report_path, self.reports_id, report_name) | |
| # Move original to processed folder | |
| print(" β Moving to processed folder...") | |
| self.move_file(file_id, self.processed_id) | |
| # Cleanup | |
| local_path.unlink() | |
| report_path.unlink() | |
| print(f" β Complete: {file_name} β {report_name}") | |
| return True | |
| except Exception as e: | |
| print(f" β Error processing {file_name}: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def watch_and_process(self, interval=30): | |
| """Watch inbox folder and process new files""" | |
| print("\n" + "="*60) | |
| print("π TB-Guard-XAI Google Drive Batch Processor") | |
| print("="*60) | |
| print(f"\nπ Watching folder: {INBOX_FOLDER}") | |
| print(f"β±οΈ Check interval: {interval} seconds") | |
| print(f"π Reports will be saved to: {REPORTS_FOLDER}") | |
| print("\nπ‘ Upload X-ray images to '{INBOX_FOLDER}' folder in Google Drive") | |
| print("π Press Ctrl+C to stop\n") | |
| try: | |
| while True: | |
| # List files in inbox | |
| files = self.list_inbox_files() | |
| # Filter out already processed | |
| new_files = [f for f in files if f['id'] not in self.processed_files] | |
| if new_files: | |
| print(f"\n㪠Found {len(new_files)} new file(s)") | |
| for file_info in new_files: | |
| success = self.process_file(file_info) | |
| if success: | |
| self.processed_files.add(file_info['id']) | |
| else: | |
| print(f"β³ {datetime.now().strftime('%H:%M:%S')} - No new files. Waiting...") | |
| time.sleep(interval) | |
| except KeyboardInterrupt: | |
| print("\n\nπ Stopping batch processor...") | |
| print("β Processed files will remain in Google Drive") | |
| def main(): | |
| """Main entry point""" | |
| import sys | |
| print("π§ Initializing TB-Guard-XAI Batch Processor...") | |
| print("π Using Hugging Face Space API for analysis") | |
| # Allow custom HF Space URL | |
| hf_url = os.getenv("HF_SPACE_URL", HF_SPACE_URL) | |
| if len(sys.argv) > 1 and sys.argv[1].startswith("http"): | |
| hf_url = sys.argv[1] | |
| print(f"π Using custom URL: {hf_url}") | |
| try: | |
| processor = GoogleDriveBatchProcessor(hf_space_url=hf_url) | |
| # Check for command line arguments | |
| if len(sys.argv) > 1 and sys.argv[-1] == "once": | |
| # Process once and exit | |
| files = processor.list_inbox_files() | |
| if files: | |
| print(f"\n㪠Found {len(files)} file(s) to process") | |
| for file_info in files: | |
| processor.process_file(file_info) | |
| else: | |
| print("\nπ No files in inbox") | |
| else: | |
| # Watch mode (default) | |
| processor.watch_and_process(interval=30) | |
| except FileNotFoundError as e: | |
| print(f"\nβ {e}") | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| if __name__ == "__main__": | |
| main() | |