politweet / textclassifier /TextClassifier.py
Demea9000's picture
now assures num_tweets is between 0 and 20.
aded009
import openai
import csv
import regex as re
from twitterscraper import TwitterScraper
from datetime import date
import os
from dotenv import find_dotenv, load_dotenv
import pandas as pd
# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
dotenv_path = find_dotenv()
load_dotenv(dotenv_path)
OPENAI_AUTHTOKEN = os.environ.get("OPENAI_AUTHTOKEN")
class TextClassifier:
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),
user_name='jimmieakesson',
num_tweets=20, ):
"""
Initializes the TextClassifier.
:param model_name: name of the model from openai.
:param from_date: string of the format 'YYYY-MM-DD'.
:param to_date: string of the format 'YYYY-MM-DD'.
:param num_tweets: integer value of the maximum number of tweets to be scraped.
"""
# Make sure to_date is later than from_date
assert from_date < to_date, "from_date must be earlier than to_date"
# Make sure the dates are in the correct format
assert re.match(r'^\d{4}-\d{2}-\d{2}$', from_date) is not None, "from_date must be in the format YYYY-MM-DD"
# Make sure user_name is not empty
assert user_name is not None, "user_name cannot be empty"
# Make sure num_tweets is a positive integer
assert 0 < num_tweets <= 20, "num_tweets must be a positive integer and at most 20"
self.model_name = model_name
self.from_date = from_date
self.to_date = to_date
self.num_tweets = num_tweets
self.user_name = user_name
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
# Assure that scrape_by_user actually gets num_tweets
while True:
self.df = self.ts.scrape_by_user(user_name)
if len(self.df) >= num_tweets:
break
else:
continue
# Make id as type int64
self.df['id'] = self.df['id'].copy().astype(int)
# self.api_key = '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'
openai.api_key = OPENAI_AUTHTOKEN
@staticmethod
def cleanup_sentiment_results(classification_unclean):
"""
Cleans up the results of the sentiment classification.
:param classification_unclean: string of the classification result.
:return: cleaned up string.
"""
classification_clean = classification_unclean.replace('\n\n', "")
classification_clean = classification_clean.replace('\n', "")
if classification_clean.startswith(" "):
classification_clean = classification_clean.replace(" ", "")
return classification_clean
def classify_sentiment(self, text: str):
"""
Classifies the sentiment of a text.
"""
assert isinstance(text, str)
prompt_string = "Classify one sentiment for this tweet:\n \""
prompt_string += text
prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \
"\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \
"\nQuestions,\nStatements,\nOpinions,\nPredictions.\nSENTIMENT="
response = openai.Completion.create(
model=self.model_name,
prompt=prompt_string,
temperature=0.0,
max_tokens=256,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
logprobs=5
)
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_sentiment_results(classification_unclean)
return classification_clean.lower()
def classify_sentiment_of_tweets(self):
"""
Classifies the sentiment of a user's tweets.
"""
df_sentiment = self.df.copy()
df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment)
self.df = df_sentiment
return self.df
def analyze_sentiment(self, text: str, sentiment: str):
"""
Analyzes the sentiment of a text using OpenAI.
:param text: string of the tweet text.
:param sentiment: string of the sentiment.
:return:
"""
# assert 1 == 2, "Måste fixa prompt innan denna metod körs"
prompt_string = "Who is the TARGET of this "
prompt_string += sentiment
prompt_string += " TWEET?\\nTWEET=\""
prompt_string += text
prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET="
response = openai.Completion.create(
model=self.model_name,
prompt=prompt_string,
temperature=0,
max_tokens=256,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
analyzed_sentiment = response.choices[0]['text']
# Remove spaces at the start/end of the response
if analyzed_sentiment.startswith(' '):
analyzed_sentiment = analyzed_sentiment[1:]
if analyzed_sentiment.endswith(' '):
analyzed_sentiment = analyzed_sentiment[:-1]
# Sometimes GPT-3 gives faulty results, so a simple filter is introduced
# If the prediction is bad
# -> set target value to N/A (not applicable)
if len(analyzed_sentiment) > 50:
analyzed_sentiment = "N/A"
# An attempt to merge target responses that should be the same
analyzed_sentiment = re.sub("\(", "", analyzed_sentiment)
analyzed_sentiment = re.sub("\)", "", analyzed_sentiment)
s_list = ["s", "the swedish social democratic party"]
m_list = ["m", "the swedish moderate party", "the moderate party"]
mp_list = ["mp", "the swedish green party"]
if analyzed_sentiment.lower() == "v":
analyzed_sentiment = "Vänsterpartiet"
elif analyzed_sentiment.lower() == "mp":
analyzed_sentiment = "Miljöpartiet"
elif analyzed_sentiment.lower() in s_list:
analyzed_sentiment = "Socialdemokraterna"
elif analyzed_sentiment.lower() == "c":
analyzed_sentiment = "Centerpartiet"
elif analyzed_sentiment.lower() == "l":
analyzed_sentiment = "Liberalerna"
elif analyzed_sentiment.lower() == "kd":
analyzed_sentiment = "Kristdemokraterna"
elif analyzed_sentiment.lower() in m_list:
analyzed_sentiment = "Moderaterna"
elif analyzed_sentiment.lower() == "sd":
analyzed_sentiment = "Sverigedemokraterna"
elif analyzed_sentiment.lower() == "the swedish government":
analyzed_sentiment = "Regeringen"
return analyzed_sentiment
def analyze_sentiment_of_tweets(self):
"""
Analyzes the sentiment of a user's tweets.
"""
# check if 'sentiment' column exists, raise exception if not
assert 'sentiment' in self.df.columns, \
"'sentiment' column does not exist. Please run classify_sentiment_of_tweets first."
df_sentiment = self.df.copy()
df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']),
axis=1)
self.df = df_sentiment
return self.df
def classify_topic(self, text: str):
"""
Classifies the topics of a text.
:param text: string of the tweet text.
"""
assert isinstance(text, str)
prompt_string = "Classify one topic for this tweet:\n \""
prompt_string += text
prompt_string += "\" \nFor example:\nEconomy,\nEnvironment,\nHealth,\nPolitics,\nScience,\nSports,\nTechnology," \
"\nTransportation,\nWorld.\nTOPIC="
response = openai.Completion.create(
model=self.model_name,
prompt=prompt_string,
temperature=0,
max_tokens=892,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
)
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_topic_results(classification_unclean)
return classification_clean.lower()
def classify_topics_of_tweets(self):
"""
Classifies the topics of a user's tweets.
"""
df_topic = self.df.copy()
df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic)
self.df = df_topic
return self.df
@staticmethod
def cleanup_topic_results(text):
new_item = text.replace("\n", " ")
new_item = new_item.replace(" ", " ")
return new_item
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates.
:param filename:
:return:
"""
if not os.path.exists(filename):
self.df.to_csv(filename, index=False)
else:
self.df.to_csv(filename, mode='a', header=False, index=False)
self.remove_duplicates_from_csv(filename)
@staticmethod
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes duplicates from csv file.
:param filename: filename of csv file
:return: None
"""
with open(filename, 'r') as f:
lines = f.readlines()
with open(filename, 'w') as f:
for line in lines:
if line not in lines[lines.index(line) + 1:]:
f.write(line)
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes tweets that have already been classified.
:param filename: filename of csv file
:return: None
"""
df = self.df
df = df[df['sentiment'].isnull()]
self.df = df
self.df_to_csv(filename)
def get_tweet_by_id(self, id, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Returns tweet by id.
:param id: id of tweet
:return: tweet
"""
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Classifies the topics/sentiments of a user's tweets.
#We presume that all tweets inside the twitterdata.csv file are already classified.
:return: None
"""
# Check if file exists, if not, create it
if os.path.exists(filename):
already_classified_df = pd.read_csv(filename)
print("Already classified tweets: {}".format(already_classified_df.shape[0]))
# Create a temporary df where values from already_classified_df that are not it self.df are stored
temp_df = self.df[self.df['id'].isin(already_classified_df['id'])]
# Remove rows from self.df that are not in already_classified_df
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])]
print("Classifying topic of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_topics_of_tweets()
print("Classifying sentiment of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_sentiment_of_tweets()
print("Writing to csv...")
self.df_to_csv(filename)
# Concatenate temp_df and self.df
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("Appended {}.".format(filename))
return None
else:
print("No csv file found. Continuing without removing already classified tweets.")
print("Classifying topics...")
self.df = self.classify_topics_of_tweets()
print("Classifying sentiments...")
self.df = self.classify_sentiment_of_tweets()
# self.df = self.analyze_sentiment_of_tweets()
print("Writing to csv file...")
self.df_to_csv(filename)
print("Created {}.".format(filename))
return None
def __repr__(self):
"""
Gives a string that describes which user is classified
:return:
"""
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."
if __name__ == "__main__":
tc = TextClassifier(from_date="2020-03-10", to_date="2020-04-10", user_name='jimmieakesson', num_tweets=20)
tc.run_main_pipeline()