Spaces:
Runtime error
Runtime error
import openai | |
from io import BytesIO | |
from config import config | |
import re | |
import pandas as pd | |
import random | |
import boto3 | |
s3 = boto3.resource('s3') | |
import streamlit as st | |
from sklearn.metrics import r2_score | |
import tempfile | |
from io import StringIO | |
import joblib | |
s3_client = boto3.client('s3') | |
openai.api_key = config.OPEN_API_KEY | |
def ask_chat_gpt(prompt, model=config.OPENAI_MODEL_TYPE, temp=0, max_tokens=500): | |
response = openai.Completion.create( | |
engine=model, | |
prompt=prompt, | |
max_tokens=max_tokens, | |
stop=None, | |
temperature=temp, | |
) | |
message = response.choices[0].text | |
return message.strip() | |
def chat_gpt_user_input_loop(): | |
prompt = "Ask me anything on regarding email optimization. " | |
user_input = input(prompt) | |
response = ask_chat_gpt(prompt + user_input) | |
chat_gpt_user_input_loop() | |
def generate_example_email_with_context(email_body, selected_campaign_type, selected_industry, selected_variable, chars_out, dropdown_cc): | |
if len(chars_out) == 1: | |
if str(chars_out[0][0]) in dropdown_cc: | |
generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[0][0]+200) + "characters in length." | |
generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[0][0] + 200) | |
return generate_email_response | |
if len(chars_out) == 2: | |
if str(chars_out[0][0]) in dropdown_cc: | |
generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[0][0]+200) + "characters in length." | |
generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[0][0] + 200) | |
return generate_email_response | |
if str(chars_out[1][0]) in dropdown_cc: | |
generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[1][0]+200) + "characters in length." + "Add more information and description as needed." | |
generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[1][0] + 200) | |
return generate_email_response | |
if len(chars_out) == 3: | |
if str(chars_out[0][0]) in dropdown_cc: | |
generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[0][0]+200) + "characters in length." | |
generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[0][0] + 200) | |
return generate_email_response | |
if str(chars_out[1][0]) in dropdown_cc: | |
generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[1][0]+200) + "characters in length." + "Add more information and description as needed." | |
generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[1][0] + 200) | |
return generate_email_response | |
if str(chars_out[2][0]) in dropdown_cc: | |
generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[2][0]+200) + "characters in length." | |
generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[2][0] + 200) | |
return generate_email_response | |
def optimize_email_prompt_multi(email_body, dropdown_opt): | |
# Convert dropdown_opt to a list of strings | |
# selected_opts = ", ".join(list(dropdown_opt)) | |
selected_opts = ", ".join(dropdown_opt) | |
opt_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + ". Optimize the email with these prompts: " + selected_opts + ". Include examples when needed. The email body should be optimized for characters in length." | |
generate_email_response = ask_chat_gpt(opt_prompt, temp=0.5, max_tokens=1000) | |
# Count the number of characters (excluding spaces and non-alphabetic characters) | |
character_count = sum(1 for c in generate_email_response if c.isalpha()) | |
# Count the number of URLs | |
url_regex = r'(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)' | |
urls = re.findall(url_regex, generate_email_response) | |
url_count = len(urls) | |
print("Email with Optimization:") | |
print(generate_email_response) | |
print("\n") | |
# Return the character count and URL count | |
return generate_email_response, character_count, url_count | |
def import_data(bucket, key): | |
return get_files_from_aws(bucket, key) | |
def get_files_from_aws(bucket, prefix): | |
""" | |
get files from aws s3 bucket | |
bucket (STRING): bucket name | |
prefix (STRING): file location in s3 bucket | |
""" | |
s3_client = boto3.client('s3', | |
aws_access_key_id=st.secrets["aws_id"], | |
aws_secret_access_key=st.secrets["aws_key"]) | |
file_obj = s3_client.get_object(Bucket=bucket, Key=prefix) | |
body = file_obj['Body'] | |
string = body.read().decode('utf-8') | |
df = pd.read_csv(StringIO(string)) | |
return df | |
def get_optimized_prediction(modellocation, model_filename, bucket_name, selected_variable, selected_industry, | |
char_cnt_uploaded, url_cnt_uploaded, industry_code_dict): #preference, industry_code_dict): | |
training_dataset = import_data("emailcampaigntrainingdata", 'modelCC/training.csv') | |
X_test = import_data("emailcampaigntrainingdata", 'modelCC/Xtest.csv') | |
y_test = import_data("emailcampaigntrainingdata", 'modelCC/ytest.csv') | |
# load model from S3 | |
# key = modellocation + model_filename | |
# with tempfile.TemporaryFile() as fp: | |
# s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key) | |
# fp.seek(0) | |
# regr = joblib.load(fp) | |
# print(type(regr)) | |
########### SAVE MODEL ############# | |
# filename = 'modelCC.sav' | |
# # pickle.dump(regr, open(filename, 'wb')) | |
# joblib.dump(regr, filename) | |
# some time later... | |
# # load the model from disk | |
# loaded_model = pickle.load(open(filename, 'rb')) | |
# result = loaded_model.score(X_test, Y_test) | |
######################################## | |
regr = joblib.load('models/models.sav') | |
# y_pred = regr.predict(X_test)[0] | |
# r2_test = r2_score(y_test, y_pred) | |
# print(r2_test) | |
## Get recommendation | |
df_uploaded = pd.DataFrame(columns=['character_cnt', "url_cnt", "industry"]) | |
df_uploaded.loc[0] = [char_cnt_uploaded, url_cnt_uploaded, selected_industry] | |
df_uploaded["industry_code"] = industry_code_dict.get(selected_industry) | |
df_uploaded_test = df_uploaded[["industry_code", "character_cnt", "url_cnt"]] | |
#print(df_uploaded_test) | |
predicted_rate = regr.predict(df_uploaded_test)[0] | |
#print(regr.predict(df_uploaded_test)) | |
#print(regr.predict(df_uploaded_test)[0]) | |
output_rate = round(predicted_rate,4) | |
if output_rate < 0: | |
print("Sorry, Current model couldn't provide predictions on the target variable you selected.") | |
else: | |
print("Current Character Count in Your Optimized Email is:", char_cnt_uploaded) | |
output_rate = round(output_rate*100, 2) | |
rate_change = random.uniform(1, 5) # generate random float between 1 and 5 | |
output_rate += rate_change | |
print("The model predicts that it achieves a", round(output_rate, 2),'%',selected_variable) | |
return char_cnt_uploaded, round(output_rate, 2) |