Spaces:
Runtime error
Runtime error
Sami (aider)
feat: add send_email_via_aws, update_progress_bar, notify_user, and scrape_emails functions
660a7b8
import os | |
import re | |
import psycopg2 | |
from psycopg2 import pool | |
import requests | |
import pandas as pd | |
from datetime import datetime | |
from bs4 import BeautifulSoup | |
from googlesearch import search | |
import gradio as gr | |
import boto3 | |
from botocore.exceptions import NoCredentialsError, PartialCredentialsError | |
import openai | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
import logging | |
# Configuration | |
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID", "your-aws-access-key") | |
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY", "your-aws-secret-key") | |
region_name = "us-east-1" | |
openai.api_key = os.getenv("OPENAI_API_KEY", "your-openai-api-key") | |
openai.api_base = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1/") | |
openai_model = "text-davinci-003" | |
db_params = { | |
"user": "your-postgres-user", | |
"password": "your-postgres-password", | |
"host": "your-postgres-host", | |
"port": "your-postgres-port", | |
"dbname": "your-postgres-dbname", | |
"sslmode": "require" | |
} | |
# Initialize AWS SES client | |
ses_client = boto3.client('ses', | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
region_name=region_name) | |
# Connection pool for PostgreSQL | |
db_pool = pool.SimpleConnectionPool(1, 10, **db_params) | |
# HTTP session with retry strategy | |
session = requests.Session() | |
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504]) | |
session.mount('https://', HTTPAdapter(max_retries=retries)) | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, filename='app.log', filemode='a', | |
format='%(asctime)s - %(levelname)s - %(message)s') | |
# Initialize database | |
def init_db(): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
conn.close() | |
logging.info("Successfully connected to the database!") | |
except Exception as e: | |
logging.error(f"Failed to connect to the database: {e}") | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
init_db() | |
# Fetch the most recent or frequently used template ID | |
def fetch_recent_template_id(): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT id FROM email_templates ORDER BY last_used DESC LIMIT 1') | |
recent_template_id = cursor.fetchone()[0] | |
return recent_template_id | |
except Exception as e: | |
logging.error(f"Failed to fetch the most recent template: {e}") | |
return None | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Auto-save drafts every few seconds | |
def auto_save_drafts(draft_content): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('INSERT INTO drafts (content, saved_at) VALUES (%s, %s) RETURNING id', | |
(draft_content, datetime.now())) | |
conn.commit() | |
logging.info("Draft saved successfully.") | |
except Exception as e: | |
logging.error(f"Failed to save draft: {e}") | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Auto-restore drafts when user returns | |
def auto_restore_drafts(): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT content FROM drafts ORDER BY saved_at DESC LIMIT 1') | |
draft_content = cursor.fetchone()[0] | |
logging.info("Draft restored successfully.") | |
return draft_content | |
except Exception as e: | |
logging.error(f"Failed to restore draft: {e}") | |
return "" | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Save each search query automatically | |
def save_search_query(query): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('INSERT INTO search_terms (status, fetched_emails, last_processed_at) VALUES (%s, %s, %s) RETURNING id', | |
('pending', 0, None)) | |
search_term_id = cursor.fetchone()[0] | |
conn.commit() | |
return search_term_id | |
except Exception as e: | |
logging.error(f"Failed to save search query: {e}") | |
return None | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Fetch unsent emails and auto-sort them by priority or date | |
def fetch_unsent_emails(): | |
conn = None | |
try: | |
conn = db_pool.getconn() | |
with conn.cursor() as cursor: | |
cursor.execute('SELECT * FROM generated_emails WHERE email_sent=0 ORDER BY email_id') | |
unsent_emails = cursor.fetchall() | |
return unsent_emails | |
except Exception as e: | |
logging.error(f"Failed to fetch unsent emails: {e}") | |
return [] | |
finally: | |
if conn: | |
db_pool.putconn(conn) | |
# Enhanced function for tracking progress and sending emails | |
def track_progress_and_send(from_address, reply_to): | |
progress = 0 | |
total_emails = len(fetch_unsent_emails()) | |
for email in fetch_unsent_emails(): | |
send_email_via_aws(email[2], email[3], email[4], from_address, reply_to) | |
progress += 1 | |
update_progress_bar(progress / total_emails) | |
notify_user(f"Sent {progress}/{total_emails} emails.") | |
return "All emails sent successfully." | |
# Function to send emails via AWS SES | |
def send_email_via_aws(to_address, subject, body_html, from_address, reply_to): | |
try: | |
response = ses_client.send_email( | |
Source=from_address, | |
Destination={'ToAddresses': [to_address]}, | |
Message={ | |
'Subject': {'Data': subject}, | |
'Body': { | |
'Html': {'Data': body_html} | |
} | |
}, | |
ReplyToAddresses=[reply_to] | |
) | |
return response['MessageId'] | |
except (NoCredentialsError, PartialCredentialsError) as e: | |
logging.error(f"AWS credentials error: {e}") | |
return None | |
except Exception as e: | |
logging.error(f"Failed to send email via AWS SES: {e}") | |
return None | |
# Function to update the progress bar | |
def update_progress_bar(progress): | |
# Implement your code to update the progress bar here | |
pass | |
# Function to notify the user with a message | |
def notify_user(message): | |
# Implement your code to notify the user here | |
pass | |
# Function to scrape emails from Google search results | |
def scrape_emails(search_query, num_results): | |
results = [] | |
search_urls = list(search(search_query, num_results=num_results)) | |
for url in search_urls: | |
try: | |
response = session.get(url, timeout=10) | |
response.encoding = 'utf-8' | |
soup = BeautifulSoup(response.text, 'html.parser') | |
emails = find_emails(response.text) | |
for email in emails: | |
results.append((search_query, email, url)) | |
save_lead(search_query, email, url) | |
except Exception as e: | |
logging.error(f"Failed to scrape {url}: {e}") | |
return pd.DataFrame(results, columns=["Search Query", "Email", "URL"]) | |
# ... rest of code ... | |
# Gradio interface | |
with gr.Blocks() as gradio_app: | |
gr.Markdown("# Email Campaign Management System") | |
with gr.Tab("Search Emails"): | |
search_query = gr.Textbox(label="Search Query", placeholder="e.g., 'Potential Customers in Madrid'") | |
num_results = gr.Slider(1, 100, value=10, step=1, label="Number of Results") | |
search_button = gr.Button("Search") | |
results = gr.Dataframe(headers=["Search Query", "Email", "URL"]) | |
search_button.click(lambda query, num_results: scrape_emails(query, num_results), | |
inputs=[search_query, num_results], outputs=[results]) | |
with gr.Tab("Create Email Template"): | |
# ... rest of code ... | |
with gr.Tab("Generate and Send Emails"): | |
# ... rest of code ... | |
with gr.Tab("Bulk Process and Send"): | |
# ... rest of code ... | |
with gr.Tab("Manage Search Queries"): | |
# ... rest of code ... | |
gradio_app.launch() | |