import time
import openai
import csv
import regex as re
from twitterscraper import TwitterScraper
from datetime import date
import os
from dotenv import find_dotenv, load_dotenv
import pandas as pd
# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
dotenv_path = find_dotenv()
class TextClassifier:
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),
num_tweets=20, ):
Initializes the TextClassifier.
:param model_name: name of the model from openai.
:param from_date: string of the format 'YYYY-MM-DD'.
:param to_date: string of the format 'YYYY-MM-DD'.
:param num_tweets: integer value of the maximum number of tweets to be scraped.
# Make sure user_name is not empty
assert user_name is not None, "user_name cannot be empty"
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
self.model_name = model_name
self.from_date = from_date
self.to_date = to_date
self.num_tweets = num_tweets
self.user_name = user_name
# Assure that scrape_by_user actually gets num_tweets
# add timer in time-loop and stop after 10 seconds
start_time = time.time()
while True:
self.df = self.ts.scrape_by_user(user_name)
if 0 < len(self.df) <= num_tweets:
if time.time() - start_time > 10:
raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.")
# Make id as type int64
self.df['id'] = self.df['id'].copy().astype(int)
# self.api_key = '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'
openai.api_key = OPENAI_AUTHTOKEN
def cleanup_sentiment_results(classification_unclean):
Cleans up the results of the sentiment classification.
:param classification_unclean: string of the classification result.
:return: cleaned up string.
classification_clean = classification_unclean.replace('\n\n', "")
classification_clean = classification_clean.replace('\n', "")
if classification_clean.startswith(" "):
classification_clean = classification_clean.replace(" ", "")
return classification_clean
def classify_sentiment(self, text: str):
Classifies the sentiment of a text.
assert isinstance(text, str)
prompt_string = "Classify one sentiment for this tweet:\n \""
prompt_string += text
prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \
"\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \
response = openai.Completion.create(
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_sentiment_results(classification_unclean)
return classification_clean.lower()
def classify_sentiment_of_tweets(self):
Classifies the sentiment of a user's tweets.
df_sentiment = self.df.copy()
df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment)
self.df = df_sentiment
return self.df
def analyze_sentiment(self, text: str, sentiment: str):
Analyzes the sentiment of a text using OpenAI.
:param text: string of the tweet text.
:param sentiment: string of the sentiment.
# assert 1 == 2, "Måste fixa prompt innan denna metod körs"
prompt_string = "Who is the TARGET of this "
prompt_string += sentiment
prompt_string += " TWEET?\\nTWEET=\""
prompt_string += text
prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET="
response = openai.Completion.create(
analyzed_sentiment = response.choices[0]['text']
# Remove spaces at the start/end of the response
if analyzed_sentiment.startswith(' '):
analyzed_sentiment = analyzed_sentiment[1:]
if analyzed_sentiment.endswith(' '):
analyzed_sentiment = analyzed_sentiment[:-1]
# Sometimes GPT-3 gives faulty results, so a simple filter is introduced
# If the prediction is bad
# -> set target value to N/A (not applicable)
if len(analyzed_sentiment) > 50:
analyzed_sentiment = "N/A"
# An attempt to merge target responses that should be the same
analyzed_sentiment = re.sub("\(", "", analyzed_sentiment)
analyzed_sentiment = re.sub("\)", "", analyzed_sentiment)
s_list = ["s", "the swedish social democratic party"]
m_list = ["m", "the swedish moderate party", "the moderate party"]
mp_list = ["mp", "the swedish green party"]
if analyzed_sentiment.lower() == "v":
analyzed_sentiment = "Vänsterpartiet"
elif analyzed_sentiment.lower() == "mp":
analyzed_sentiment = "Miljöpartiet"
elif analyzed_sentiment.lower() in s_list:
analyzed_sentiment = "Socialdemokraterna"
elif analyzed_sentiment.lower() == "c":
analyzed_sentiment = "Centerpartiet"
elif analyzed_sentiment.lower() == "l":
analyzed_sentiment = "Liberalerna"
elif analyzed_sentiment.lower() == "kd":
analyzed_sentiment = "Kristdemokraterna"
elif analyzed_sentiment.lower() in m_list:
analyzed_sentiment = "Moderaterna"
elif analyzed_sentiment.lower() == "sd":
analyzed_sentiment = "Sverigedemokraterna"
elif analyzed_sentiment.lower() == "the swedish government":
analyzed_sentiment = "Regeringen"
analyzed_sentiment = self.cleanup_sentiment_results(analyzed_sentiment)
return analyzed_sentiment
def analyze_sentiment_of_tweets(self):
Analyzes the sentiment of a user's tweets.
# check if 'sentiment' column exists, raise exception if not
assert 'sentiment' in self.df.columns, \
"'sentiment' column does not exist. Please run classify_sentiment_of_tweets first."
df_sentiment = self.df.copy()
df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']),
self.df = df_sentiment
return self.df
def classify_topic(self, text: str):
Classifies the topics of a text.
:param text: string of the tweet text.
assert isinstance(text, str)
prompt_string = "Classify this tweet with a general topic and two sub-topics:\n\""
prompt_string += text
prompt_string += "\".\nGeneral topic: \nSub topic 1: \nSub topic 2:\n. The classifications should not be " \
"more than 5 words. Numerate each topic in the output. END "
response = openai.Completion.create(
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_topic_results(classification_unclean)
return classification_clean.lower()
def classify_topics_of_tweets(self):
Classifies the topics of a user's tweets.
df_topic = self.df.copy()
df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic)
self.df = df_topic
return self.df
def cleanup_topic_results(text):
new_item = text.replace("\n", " ")
new_item = new_item.replace(" ", " ")
return new_item
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates.
:param filename:
if not os.path.exists(filename):
self.df.to_csv(filename, index=False)
self.df.to_csv(filename, mode='a', header=False, index=False)
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
Removes duplicates from csv file.
:param filename: filename of csv file
:return: None
with open(filename, 'r') as f:
lines = f.readlines()
with open(filename, 'w') as f:
for line in lines:
if line not in lines[lines.index(line) + 1:]:
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
Removes tweets that have already been classified.
:param filename: filename of csv file
:return: None
df = self.df
df = df[df['sentiment'].isnull()]
self.df = df
def get_tweet_by_id(self, id, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
Returns tweet by id.
:param id: id of tweet
:return: tweet
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
Classifies the topics/sentiments of a user's tweets.
#We presume that all tweets inside the twitterdata.csv file are already classified.
:return: None
# Check if file exists, if not, create it
if os.path.exists(filename):
already_classified_df = pd.read_csv(filename, on_bad_lines='skip')
print("Already classified tweets: {}".format(already_classified_df.shape[0]))
# Create a temporary df where values from already_classified_df that are not it self.df are stored
temp_df = self.df[self.df['id'].isin(already_classified_df['id'])]
# Remove rows from self.df that are not in already_classified_df
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])]
print("Classifying topic of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_topics_of_tweets()
print("Classifying sentiment of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_sentiment_of_tweets()
print("Waiting for 1 minute... before analyzing targets...")
self.df = self.analyze_sentiment_of_tweets()
print("Writing to csv...")
# Concatenate temp_df and self.df
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("Appended {}.".format(filename))
return None
print("No csv file found. Continuing without removing already classified tweets.")
print("Classifying topics...")
self.df = self.classify_topics_of_tweets()
print("Classifying sentiments...")
self.df = self.classify_sentiment_of_tweets()
print("Waiting for 1 minute... before analyzing targets...")
self.df = self.analyze_sentiment_of_tweets()
print("Writing to csv file...")
print("Created {}.".format(filename))
return None
def __repr__(self):
Gives a string that describes which user is classified
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."
if __name__ == "__main__":
tc = TextClassifier(from_date="2020-02-01", to_date="2020-02-28", user_name='jimmieakesson', num_tweets=20)