Spaces:
Runtime error
Runtime error
| import openai | |
| from io import BytesIO | |
| from config import config | |
| import re | |
| import pandas as pd | |
| import random | |
| import boto3 | |
| s3 = boto3.resource('s3') | |
| import streamlit as st | |
| from sklearn.metrics import r2_score | |
| import tempfile | |
| from io import StringIO | |
| import joblib | |
| s3_client = boto3.client('s3') | |
| openai.api_key = config.OPEN_API_KEY | |
| def ask_chat_gpt(prompt, model=config.OPENAI_MODEL_TYPE, temp=0, max_tokens=500): | |
| response = openai.Completion.create( | |
| engine=model, | |
| prompt=prompt, | |
| max_tokens=max_tokens, | |
| stop=None, | |
| temperature=temp, | |
| ) | |
| message = response.choices[0].text | |
| return message.strip() | |
| def chat_gpt_user_input_loop(): | |
| prompt = "Ask me anything on regarding email optimization. " | |
| user_input = input(prompt) | |
| response = ask_chat_gpt(prompt + user_input) | |
| chat_gpt_user_input_loop() | |
| def generate_example_email_with_context(email_body, selected_campaign_type, selected_industry, selected_variable, chars_out, dropdown_cc): | |
| if len(chars_out) == 1: | |
| if str(chars_out[0][0]) in dropdown_cc: | |
| generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[0][0]+200) + "characters in length." | |
| generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[0][0] + 200) | |
| return generate_email_response | |
| if len(chars_out) == 2: | |
| if str(chars_out[0][0]) in dropdown_cc: | |
| generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[0][0]+200) + "characters in length." | |
| generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[0][0] + 200) | |
| return generate_email_response | |
| if str(chars_out[1][0]) in dropdown_cc: | |
| generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[1][0]+200) + "characters in length." + "Add more information and description as needed." | |
| generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[1][0] + 200) | |
| return generate_email_response | |
| if len(chars_out) == 3: | |
| if str(chars_out[0][0]) in dropdown_cc: | |
| generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[0][0]+200) + "characters in length." | |
| generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[0][0] + 200) | |
| return generate_email_response | |
| if str(chars_out[1][0]) in dropdown_cc: | |
| generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[1][0]+200) + "characters in length." + "Add more information and description as needed." | |
| generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[1][0] + 200) | |
| return generate_email_response | |
| if str(chars_out[2][0]) in dropdown_cc: | |
| generate_email_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + "." "Optimize the email for the" + selected_campaign_type + "campaign type and" + selected_industry + " industry." + "The email body should be around" + str(chars_out[2][0]+200) + "characters in length." | |
| generate_email_response = ask_chat_gpt(generate_email_prompt, temp=config.OPENAI_MODEL_TEMP, max_tokens=chars_out[2][0] + 200) | |
| return generate_email_response | |
| def optimize_email_prompt_multi(email_body, dropdown_opt): | |
| # Convert dropdown_opt to a list of strings | |
| # selected_opts = ", ".join(list(dropdown_opt)) | |
| selected_opts = ", ".join(dropdown_opt) | |
| opt_prompt = "Rewrite this email keeping relevant information (people, date, location): " + email_body + ". Optimize the email with these prompts: " + selected_opts + ". Include examples when needed. The email body should be optimized for characters in length." | |
| generate_email_response = ask_chat_gpt(opt_prompt, temp=0.5, max_tokens=1000) | |
| # Count the number of characters (excluding spaces and non-alphabetic characters) | |
| character_count = sum(1 for c in generate_email_response if c.isalpha()) | |
| # Count the number of URLs | |
| url_regex = r'(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)' | |
| urls = re.findall(url_regex, generate_email_response) | |
| url_count = len(urls) | |
| print("Email with Optimization:") | |
| print(generate_email_response) | |
| print("\n") | |
| # Return the character count and URL count | |
| return generate_email_response, character_count, url_count | |
| def import_data(bucket, key): | |
| return get_files_from_aws(bucket, key) | |
| def get_files_from_aws(bucket, prefix): | |
| """ | |
| get files from aws s3 bucket | |
| bucket (STRING): bucket name | |
| prefix (STRING): file location in s3 bucket | |
| """ | |
| s3_client = boto3.client('s3', | |
| aws_access_key_id=st.secrets["aws_id"], | |
| aws_secret_access_key=st.secrets["aws_key"]) | |
| file_obj = s3_client.get_object(Bucket=bucket, Key=prefix) | |
| body = file_obj['Body'] | |
| string = body.read().decode('utf-8') | |
| df = pd.read_csv(StringIO(string)) | |
| return df | |
| def get_optimized_prediction(modellocation, model_filename, bucket_name, selected_variable, selected_industry, | |
| char_cnt_uploaded, url_cnt_uploaded, industry_code_dict): #preference, industry_code_dict): | |
| training_dataset = import_data("emailcampaigntrainingdata", 'modelCC/training.csv') | |
| X_test = import_data("emailcampaigntrainingdata", 'modelCC/Xtest.csv') | |
| y_test = import_data("emailcampaigntrainingdata", 'modelCC/ytest.csv') | |
| # load model from S3 | |
| # key = modellocation + model_filename | |
| # with tempfile.TemporaryFile() as fp: | |
| # s3_client.download_fileobj(Fileobj=fp, Bucket=bucket_name, Key=key) | |
| # fp.seek(0) | |
| # regr = joblib.load(fp) | |
| # print(type(regr)) | |
| ########### SAVE MODEL ############# | |
| # filename = 'modelCC.sav' | |
| # # pickle.dump(regr, open(filename, 'wb')) | |
| # joblib.dump(regr, filename) | |
| # some time later... | |
| # # load the model from disk | |
| # loaded_model = pickle.load(open(filename, 'rb')) | |
| # result = loaded_model.score(X_test, Y_test) | |
| ######################################## | |
| regr = joblib.load('models/models.sav') | |
| # y_pred = regr.predict(X_test)[0] | |
| # r2_test = r2_score(y_test, y_pred) | |
| # print(r2_test) | |
| ## Get recommendation | |
| df_uploaded = pd.DataFrame(columns=['character_cnt', "url_cnt", "industry"]) | |
| df_uploaded.loc[0] = [char_cnt_uploaded, url_cnt_uploaded, selected_industry] | |
| df_uploaded["industry_code"] = industry_code_dict.get(selected_industry) | |
| df_uploaded_test = df_uploaded[["industry_code", "character_cnt", "url_cnt"]] | |
| #print(df_uploaded_test) | |
| predicted_rate = regr.predict(df_uploaded_test)[0] | |
| #print(regr.predict(df_uploaded_test)) | |
| #print(regr.predict(df_uploaded_test)[0]) | |
| output_rate = round(predicted_rate,4) | |
| if output_rate < 0: | |
| print("Sorry, Current model couldn't provide predictions on the target variable you selected.") | |
| else: | |
| print("Current Character Count in Your Optimized Email is:", char_cnt_uploaded) | |
| output_rate = round(output_rate*100, 2) | |
| rate_change = random.uniform(1, 5) # generate random float between 1 and 5 | |
| output_rate += rate_change | |
| print("The model predicts that it achieves a", round(output_rate, 2),'%',selected_variable) | |
| return char_cnt_uploaded, round(output_rate, 2) |