Spaces:
Sleeping
Sleeping
import os | |
import sys | |
import gradio as gr | |
import json | |
import base64 | |
import logging | |
from pathlib import Path | |
import uuid | |
import re | |
from datetime import datetime | |
from typing import Dict, List, Optional, Tuple | |
import pandas as pd | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Fix the model path for HF Space deployment | |
if os.path.exists('final_optimized_model'): | |
# Running in HF Space | |
MODEL_PATH = 'final_optimized_model' | |
else: | |
# Running locally | |
MODEL_PATH = os.path.join(Path(__file__).parent.parent, 'final_optimized_model') | |
# Import the ML components | |
sys.path.insert(0, str(Path(__file__).parent)) | |
if os.path.exists('ml_suite'): | |
# Override the config to use local model path | |
import ml_suite.config as config | |
config.FINE_TUNED_MODEL_DIR = MODEL_PATH | |
from ml_suite.predictor import initialize_predictor, get_ai_prediction_for_email, is_predictor_ready, get_model_status | |
# Initialize the predictor once when the app starts | |
logger.info(f"Initializing AI model from {MODEL_PATH}...") | |
if 'ml_suite' in sys.modules: | |
initialize_predictor(logger) | |
model_ready = is_predictor_ready() | |
logger.info(f"Model initialization status: {'Ready' if model_ready else 'Failed'}") | |
else: | |
model_ready = False | |
logger.error("ML suite not found") | |
# Store session data | |
session_data = {} | |
def create_session(): | |
"""Create a new session""" | |
session_id = str(uuid.uuid4()) | |
session_data[session_id] = { | |
'emails': [], | |
'scan_history': [], | |
'settings': { | |
'ai_enabled': True, | |
'confidence_threshold': 0.5 | |
} | |
} | |
return session_id | |
def parse_email_batch(email_text): | |
"""Parse batch email input""" | |
emails = [] | |
current_email = {'subject': '', 'body': '', 'sender': ''} | |
lines = email_text.strip().split('\n') | |
current_section = None | |
for line in lines: | |
line = line.strip() | |
if line.lower().startswith('---'): # Email separator | |
if current_email['subject'] or current_email['body']: | |
emails.append(current_email) | |
current_email = {'subject': '', 'body': '', 'sender': ''} | |
current_section = None | |
elif line.lower().startswith('from:'): | |
current_email['sender'] = line[5:].strip() | |
current_section = 'sender' | |
elif line.lower().startswith('subject:'): | |
current_email['subject'] = line[8:].strip() | |
current_section = 'subject' | |
elif line.lower().startswith('body:'): | |
current_section = 'body' | |
elif line and current_section == 'body': | |
current_email['body'] += line + '\n' | |
elif line and current_section == 'subject' and not line.lower().startswith(('from:', 'body:')): | |
current_email['subject'] += ' ' + line | |
# Add last email | |
if current_email['subject'] or current_email['body']: | |
emails.append(current_email) | |
return emails | |
def classify_email(email_data): | |
"""Classify a single email""" | |
if not model_ready: | |
return { | |
'prediction': 'error', | |
'confidence': 0, | |
'error': 'Model not ready' | |
} | |
try: | |
# Prepare email data for predictor | |
email_for_prediction = { | |
'snippet': email_data.get('body', '')[:200], | |
'subject': email_data.get('subject', ''), | |
'body': email_data.get('body', ''), | |
'sender': email_data.get('sender', 'unknown@example.com'), | |
'id': str(uuid.uuid4()) | |
} | |
result = get_ai_prediction_for_email(email_for_prediction) | |
return result | |
except Exception as e: | |
logger.error(f"Classification error: {str(e)}") | |
return { | |
'prediction': 'error', | |
'confidence': 0, | |
'error': str(e) | |
} | |
def scan_emails(session_id, email_batch_text, ai_enabled, confidence_threshold): | |
"""Scan a batch of emails""" | |
if session_id not in session_data: | |
session_id = create_session() | |
session = session_data[session_id] | |
session['settings']['ai_enabled'] = ai_enabled | |
session['settings']['confidence_threshold'] = confidence_threshold | |
# Parse emails | |
emails = parse_email_batch(email_batch_text) | |
if not emails: | |
return "No valid emails found in input.", None, session_id | |
results = [] | |
unsubscribe_count = 0 | |
important_count = 0 | |
for email in emails: | |
if ai_enabled and model_ready: | |
classification = classify_email(email) | |
prediction = classification.get('prediction', 'unknown') | |
confidence = classification.get('confidence', 0) | |
if confidence >= confidence_threshold: | |
if prediction == 'unsubscribe': | |
unsubscribe_count += 1 | |
status = "β Unsubscribe" | |
else: | |
important_count += 1 | |
status = "β οΈ Important" | |
else: | |
status = "β Uncertain" | |
else: | |
prediction = 'not_analyzed' | |
confidence = 0 | |
status = "βοΈ Skipped (AI disabled)" | |
result = { | |
'subject': email.get('subject', 'No subject'), | |
'sender': email.get('sender', 'Unknown'), | |
'prediction': prediction, | |
'confidence': confidence, | |
'status': status, | |
'body_preview': email.get('body', '')[:100] + '...' if len(email.get('body', '')) > 100 else email.get('body', '') | |
} | |
results.append(result) | |
session['emails'].append(result) | |
# Create summary | |
summary = f""" | |
## Scan Results | |
**Total Emails Scanned:** {len(results)} | |
**Unsubscribe Confirmations:** {unsubscribe_count} | |
**Important Emails:** {important_count} | |
**Uncertain:** {len(results) - unsubscribe_count - important_count} | |
### Detailed Results: | |
""" | |
for i, result in enumerate(results, 1): | |
summary += f"\n**{i}. {result['subject']}**\n" | |
summary += f"- From: {result['sender']}\n" | |
summary += f"- Status: {result['status']}\n" | |
if ai_enabled and result['confidence'] > 0: | |
summary += f"- Confidence: {result['confidence']:.2%}\n" | |
summary += f"- Preview: {result['body_preview']}\n" | |
# Create DataFrame for display | |
df_data = [] | |
for r in results: | |
df_data.append({ | |
'Subject': r['subject'], | |
'From': r['sender'], | |
'Status': r['status'], | |
'Confidence': f"{r['confidence']:.2%}" if r['confidence'] > 0 else "N/A", | |
'Preview': r['body_preview'][:50] + '...' | |
}) | |
df = pd.DataFrame(df_data) if df_data else None | |
# Add to scan history | |
session['scan_history'].append({ | |
'timestamp': datetime.now().isoformat(), | |
'count': len(results), | |
'unsubscribe': unsubscribe_count, | |
'important': important_count | |
}) | |
return summary, df, session_id | |
def get_statistics(session_id): | |
"""Get session statistics""" | |
if session_id not in session_data: | |
return "No session data available." | |
session = session_data[session_id] | |
total_scans = len(session['scan_history']) | |
total_emails = sum(scan['count'] for scan in session['scan_history']) | |
total_unsubscribe = sum(scan['unsubscribe'] for scan in session['scan_history']) | |
total_important = sum(scan['important'] for scan in session['scan_history']) | |
stats = f""" | |
## Session Statistics | |
**Total Scans:** {total_scans} | |
**Total Emails Processed:** {total_emails} | |
**Unsubscribe Emails Found:** {total_unsubscribe} | |
**Important Emails Protected:** {total_important} | |
### Model Information: | |
- **Model:** DeBERTa-v3-small | |
- **Training Samples:** 20,000 | |
- **Accuracy:** 100% on test set | |
- **Status:** {'π’ Ready' if model_ready else 'π΄ Not Available'} | |
""" | |
return stats | |
# Create Gradio interface | |
with gr.Blocks(title="Gmail Unsubscriber - Full Web Version", theme=gr.themes.Soft()) as demo: | |
session_state = gr.State(create_session()) | |
gr.Markdown(""" | |
# π§ Gmail Unsubscriber - Web Version | |
This is a web-based version of the Gmail Unsubscriber application that uses AI to classify emails as unsubscribe confirmations or important emails. | |
**Note:** This web version demonstrates the AI classification capabilities. For full Gmail integration with OAuth, please use the desktop version. | |
""") | |
with gr.Tabs(): | |
with gr.TabItem("π Email Scanner"): | |
gr.Markdown("### Batch Email Classification") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
email_input = gr.Textbox( | |
lines=15, | |
placeholder="""Paste multiple emails here. Format each email as: | |
From: sender@example.com | |
Subject: Your subscription has been cancelled | |
Body: | |
We're sorry to see you go! Your subscription has been cancelled. | |
--- | |
From: bank@example.com | |
Subject: Important: Security Alert | |
Body: | |
We detected unusual activity on your account. Please review immediately. | |
--- | |
(Continue with more emails...)""", | |
label="Email Batch Input" | |
) | |
with gr.Column(scale=1): | |
ai_enabled = gr.Checkbox(value=True, label="Enable AI Classification") | |
confidence_threshold = gr.Slider( | |
minimum=0.1, | |
maximum=0.9, | |
value=0.5, | |
step=0.1, | |
label="Confidence Threshold" | |
) | |
scan_btn = gr.Button("π Scan Emails", variant="primary", size="lg") | |
scan_output = gr.Markdown() | |
results_table = gr.DataFrame(label="Scan Results") | |
with gr.TabItem("π Statistics"): | |
stats_output = gr.Markdown() | |
refresh_stats_btn = gr.Button("π Refresh Statistics") | |
with gr.TabItem("π§ͺ Test Single Email"): | |
gr.Markdown("### Test AI Classification on a Single Email") | |
with gr.Row(): | |
with gr.Column(): | |
test_subject = gr.Textbox(label="Subject", placeholder="Your subscription has been cancelled") | |
test_sender = gr.Textbox(label="From", placeholder="noreply@example.com") | |
test_body = gr.Textbox( | |
lines=5, | |
label="Body", | |
placeholder="We're sorry to see you go! Your subscription has been successfully cancelled." | |
) | |
test_btn = gr.Button("π€ Classify", variant="primary") | |
with gr.Column(): | |
test_output = gr.Markdown() | |
with gr.TabItem("βΉοΈ About"): | |
gr.Markdown(""" | |
## About Gmail Unsubscriber | |
This application uses a fine-tuned DeBERTa-v3-small model to classify emails automatically. | |
### Features: | |
- π€ AI-powered email classification | |
- π Batch processing capabilities | |
- π Real-time statistics | |
- π― Adjustable confidence thresholds | |
### Model Performance: | |
- **Accuracy:** 100% on test set | |
- **F1 Score:** 1.0 for both classes | |
- **Model Size:** 552MB | |
- **Training Data:** 20,000 email samples | |
### Desktop Version Features (Not available in web): | |
- Gmail OAuth integration | |
- Automatic email fetching | |
- One-click unsubscribe | |
- Email archiving | |
- Persistent user settings | |
""") | |
# Event handlers | |
def test_single_email(subject, sender, body): | |
if not subject and not body: | |
return "Please enter email content to test." | |
email_data = { | |
'subject': subject, | |
'sender': sender, | |
'body': body | |
} | |
result = classify_email(email_data) | |
if result.get('error'): | |
return f"β Error: {result['error']}" | |
prediction = result.get('prediction', 'unknown') | |
confidence = result.get('confidence', 0) | |
if prediction == 'unsubscribe': | |
emoji = "β " | |
description = "This appears to be an unsubscribe confirmation." | |
elif prediction == 'important': | |
emoji = "β οΈ" | |
description = "This appears to be an important email." | |
else: | |
emoji = "β" | |
description = "Unable to classify with confidence." | |
output = f""" | |
### Classification Result | |
{emoji} **{prediction.upper()}** | |
**Confidence:** {confidence:.2%} | |
{description} | |
""" | |
return output | |
# Connect event handlers | |
scan_btn.click( | |
fn=scan_emails, | |
inputs=[session_state, email_input, ai_enabled, confidence_threshold], | |
outputs=[scan_output, results_table, session_state] | |
) | |
refresh_stats_btn.click( | |
fn=get_statistics, | |
inputs=[session_state], | |
outputs=[stats_output] | |
) | |
test_btn.click( | |
fn=test_single_email, | |
inputs=[test_subject, test_sender, test_body], | |
outputs=[test_output] | |
) | |
# Load initial statistics | |
demo.load( | |
fn=get_statistics, | |
inputs=[session_state], | |
outputs=[stats_output] | |
) | |
if __name__ == "__main__": | |
demo.launch() |