cold_mail_outreach / tools.py
Ashhar
add email stats
4e472cd
import gspread
import os
import json
from sendgrid import SendGridAPIClient
import streamlit as st
import pandas as pd
import plotly.express as px
from dotenv import load_dotenv
load_dotenv()
GCP_JSON_KEY = os.environ.get("GCP_JSON_KEY")
SENDGRID_API_KEY = os.environ.get("SENDGRID_API_KEY")
MY_EMAIL = "ashharakhlaque@gmail.com"
MY_NAME = "Ashhar Akhlaque"
TEMPLATE_COL_NAME = "Template"
print(SENDGRID_API_KEY)
sg = SendGridAPIClient(SENDGRID_API_KEY)
def saveProfileDetailsInGSheet(
emailPurpose,
aboutYou,
recipientIndustry,
recipientRole,
specificDetails
):
client = gspread.service_account_from_dict(json.loads(GCP_JSON_KEY))
workBook = "Cold Mail Outreach - User Profiles"
try:
spreadsheet = client.open(workBook)
except gspread.SpreadsheetNotFound:
print(f"Creating new sheet: {workBook}")
spreadsheet = client.create(workBook)
print(f"Created new sheet: {spreadsheet.url}")
spreadsheet.share(
MY_EMAIL,
perm_type='user',
role='writer',
)
sheet = spreadsheet.worksheet("Profiles & Templates")
sheet.append_row([
emailPurpose,
aboutYou,
recipientIndustry,
recipientRole,
specificDetails
])
if not (emailPurpose and aboutYou):
return {}
return {
"response": "Saved profile details in Google Sheet.",
"display": {
"text": f"`Saved profile details in Google Sheet` [Link]({spreadsheet.url})",
"icon": "icons/completed-task.png",
}
}
def saveTemplateInGSheet(template: str):
client = gspread.service_account_from_dict(json.loads(GCP_JSON_KEY))
workBook = "Cold Mail Outreach - User Profiles"
try:
spreadsheet = client.open(workBook)
except gspread.SpreadsheetNotFound:
print(f"Sheet not found: {workBook}")
return {
"response": "Failed to save template. Sheet not found.",
"display": {
"text": "`Failed to save template. Sheet not found.`",
"icon": "icons/error.png",
}
}
sheet = spreadsheet.worksheet("Profiles & Templates")
headers = sheet.row_values(1)
templateColIndex = headers.index(TEMPLATE_COL_NAME) + 1
lastRow = len([row for row in sheet.get_all_values() if any(row)])
sheet.update_cell(lastRow, templateColIndex, template)
return {
"response": "Saved template in Google Sheet.",
"display": {
"text": f"`Saved template in Google Sheet` [Link]({spreadsheet.url})",
"icon": "icons/completed-task.png",
}
}
def sendEmail(
toEmail: str,
subject: str,
htmlContent: str
):
data = {
"personalizations": [{
"to": [
{
"email": toEmail,
}
],
"subject": subject
}],
"from": {
"email": MY_EMAIL,
"name": MY_NAME
},
"reply_to": {
"email": MY_EMAIL,
"name": MY_NAME
},
"content": [{
"type": "text/html",
"value": htmlContent
}]
}
try:
response = sg.client.mail.send.post(request_body=data)
print(f"{response.status_code=}")
st.session_state["emailSent"] = True
return {
"response": "Email sent",
"display": {
"text": f"`Email sent to {toEmail}`",
"icon": "icons/mail.png",
}
}
except Exception as e:
print(e.message)
return {
"response": f"Failed to send email. Error: {e.message}",
"display": {
"text": f"`Failed to send email to {toEmail}`",
"icon": "icons/error.png",
}
}
toolsInfo = {
"saveProfileDetailsInGSheet": {
"func": saveProfileDetailsInGSheet,
"schema": {
"type": "function",
"function": {
"name": "saveProfileDetailsInGSheet",
"description": "Saves the email profile details in Google Sheet",
"parameters": {
"type": "object",
"properties": {
"emailPurpose": {
"type": "string",
"description": "Purpose of the email"
},
"aboutYou": {
"type": "string",
"description": "A bit about you that's required for email content"
},
"recipientIndustry": {
"type": "string",
"description": "Industry of the recipient"
},
"recipientRole": {
"type": "string",
"description": "Recipient's role in the company"
},
"specificDetails": {
"type": "string",
"description": "Any specific details about the email"
}
},
"required": ["emailPurpose", "aboutYou", "recipientIndustry"]
}
}
},
},
"saveTemplateInGSheet": {
"func": saveTemplateInGSheet,
"schema": {
"type": "function",
"function": {
"name": "saveTemplateInGSheet",
"description": "Saves the email template in Google Sheet",
"parameters": {
"type": "object",
"properties": {
"template": {
"type": "string",
"description": "Email template"
}
},
"required": ["template"]
}
}
},
},
"sendEmail": {
"func": sendEmail,
"schema": {
"type": "function",
"function": {
"name": "sendEmail",
"description": "Sends an email to the user",
"parameters": {
"type": "object",
"properties": {
"toEmail": {
"type": "string",
"description": "Email address of the recipient"
},
"subject": {
"type": "string",
"description": "Subject of the email"
},
"htmlContent": {
"type": "string",
"description": "HTML content of the email"
}
},
"required": ["toEmail", "subject", "htmlContent"]
}
}
},
},
}
def showEmailStats():
params = {'start_date': '2024-09-07'}
response = sg.client.stats.get(
query_params=params
)
stats = json.loads(response.body)
# Convert stats to a pandas DataFrame
df = pd.DataFrame([
{
'date': item['date'],
**item['stats'][0]['metrics']
} for item in stats
])
df['date'] = pd.to_datetime(df['date'])
# Display overall stats
st.subheader("Email Campaign Overview")
col1, col2, col3, col4, col5 = st.columns([1, 1, 1, 1, 4])
col1.metric("Total Sent", df['requests'].sum())
col2.metric("Delivered", df['delivered'].sum())
col3.metric("Opens", df['opens'].sum())
col4.metric("Bounces", df['bounces'].sum())
col1, col2, col3 = st.columns([3, 1, 3])
# Create line chart for key metrics
figLine = px.line(
df,
x='date',
y=['delivered', 'opens', 'unique_opens', 'bounces'],
title='Key Metrics Over Time',
color_discrete_map={ # Define custom colors
'delivered': 'green',
'opens': 'blue',
'unique_opens': 'purple',
'bounces': 'red'
},
labels={'date': 'Date', 'value': 'Count'},
markers=True,
height=400,
)
figLine.update_layout(
legend=dict(
orientation="h", # Horizontal orientation
yanchor="bottom",
y=1.02, # Position above the chart
xanchor="right",
x=1,
title=None, # Remove legend title
)
)
col1.plotly_chart(figLine)
# Create bar chart for email status
dfStatus = df[['delivered', 'bounces', 'invalid_emails']].sum().reset_index()
dfStatus.columns = ['status', 'count']
figBar = px.bar(
dfStatus,
x='status',
y='count',
title='Email Status Distribution',
color='status', # Use status for color
color_discrete_map={ # Define custom colors
'delivered': 'green',
'bounces': 'red',
'invalid_emails': 'orange'
},
labels={'status': 'Status', 'count': 'Count'},
text='count',
height=400,
)
figBar.update_layout(
legend=dict(
orientation="h", # Horizontal orientation
yanchor="bottom",
y=1.02, # Position above the chart
xanchor="right",
x=1,
title=None, # Remove legend title
)
)
col3.plotly_chart(figBar)