politweet / textclassifier /TextClassifier.py
Demea9000's picture
fixed some logical cases in pipeline
0b5fde3
import time
import openai
import csv
import regex as re
from twitterscraper import TwitterScraper
from datetime import date
import os
from dotenv import find_dotenv, load_dotenv
import pandas as pd
import warnings
from pandas.core.common import SettingWithCopyWarning
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
dotenv_path = find_dotenv()
load_dotenv(dotenv_path)
OPENAI_AUTHTOKEN = os.environ.get("OPENAI_AUTHTOKEN")
class TextClassifier:
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),
user_name='jimmieakesson',
num_tweets=20, ):
"""
Initializes the TextClassifier.
:param model_name: name of the model from openai.
:param from_date: string of the format 'YYYY-MM-DD'.
:param to_date: string of the format 'YYYY-MM-DD'.
:param num_tweets: integer value of the maximum number of tweets to be scraped.
"""
# Make sure user_name is not empty
assert user_name is not None, "user_name cannot be empty"
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
self.model_name = model_name
self.from_date = from_date
self.to_date = to_date
self.num_tweets = num_tweets
self.user_name = user_name
# Assure that scrape_by_user actually gets num_tweets
# add timer in time-loop and stop after 10 seconds
start_time = time.time()
while True:
self.df = self.ts.scrape_by_user(user_name)
if 0 < len(self.df) <= num_tweets:
break
else:
if time.time() - start_time > 10:
raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.")
continue
# Make id as type int64
self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x))
# self.api_key = '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'
openai.api_key = OPENAI_AUTHTOKEN
@staticmethod
def cleanup_sentiment_results(classification_unclean):
"""
Cleans up the results of the sentiment classification.
:param classification_unclean: string of the classification result.
:return: cleaned up string.
"""
classification_clean = classification_unclean.replace('\n\n', "")
classification_clean = classification_clean.replace('\n', "")
if classification_clean.startswith(" "):
classification_clean = classification_clean.replace(" ", "")
return classification_clean
def classify_sentiment(self, text: str):
"""
Classifies the sentiment of a text.
"""
assert isinstance(text, str)
prompt_string = "Classify one sentiment for this tweet:\n \""
prompt_string += text
prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \
"\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \
"\nQuestions,\nStatements,\nOpinions,\nPredictions.\nSENTIMENT="
response = openai.Completion.create(
model=self.model_name,
prompt=prompt_string,
temperature=0.0,
max_tokens=256,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
logprobs=5
)
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_sentiment_results(classification_unclean)
return classification_clean.lower()
def classify_sentiment_of_tweets(self):
"""
Classifies the sentiment of a user's tweets.
"""
df_sentiment = self.df.copy()
df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment)
self.df = df_sentiment
return self.df
def analyze_sentiment(self, text: str, sentiment: str):
"""
Analyzes the sentiment of a text using OpenAI.
:param text: string of the tweet text.
:param sentiment: string of the sentiment.
:return:
"""
# assert 1 == 2, "Måste fixa prompt innan denna metod körs"
prompt_string = "Who is the TARGET of this "
prompt_string += sentiment
prompt_string += " TWEET?\\nTWEET=\""
prompt_string += text
prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET="
response = openai.Completion.create(
model=self.model_name,
prompt=prompt_string,
temperature=0,
max_tokens=256,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
analyzed_sentiment = response.choices[0]['text']
# Remove spaces at the start/end of the response
if analyzed_sentiment.startswith(' '):
analyzed_sentiment = analyzed_sentiment[1:]
if analyzed_sentiment.endswith(' '):
analyzed_sentiment = analyzed_sentiment[:-1]
# Sometimes GPT-3 gives faulty results, so a simple filter is introduced
# If the prediction is bad
# -> set target value to N/A (not applicable)
if len(analyzed_sentiment) > 50:
analyzed_sentiment = "N/A"
# An attempt to merge target responses that should be the same
analyzed_sentiment = re.sub("\(", "", analyzed_sentiment)
analyzed_sentiment = re.sub("\)", "", analyzed_sentiment)
s_list = ["s", "the swedish social democratic party"]
m_list = ["m", "the swedish moderate party", "the moderate party"]
mp_list = ["mp", "the swedish green party"]
if analyzed_sentiment.lower() == "v":
analyzed_sentiment = "Vänsterpartiet"
elif analyzed_sentiment.lower() == "mp":
analyzed_sentiment = "Miljöpartiet"
elif analyzed_sentiment.lower() in s_list:
analyzed_sentiment = "Socialdemokraterna"
elif analyzed_sentiment.lower() == "c":
analyzed_sentiment = "Centerpartiet"
elif analyzed_sentiment.lower() == "l":
analyzed_sentiment = "Liberalerna"
elif analyzed_sentiment.lower() == "kd":
analyzed_sentiment = "Kristdemokraterna"
elif analyzed_sentiment.lower() in m_list:
analyzed_sentiment = "Moderaterna"
elif analyzed_sentiment.lower() == "sd":
analyzed_sentiment = "Sverigedemokraterna"
elif analyzed_sentiment.lower() == "the swedish government":
analyzed_sentiment = "Regeringen"
analyzed_sentiment = self.cleanup_sentiment_results(analyzed_sentiment)
return analyzed_sentiment
def analyze_sentiment_of_tweets(self):
"""
Analyzes the sentiment of a user's tweets.
"""
# check if 'sentiment' column exists, raise exception if not
assert 'sentiment' in self.df.columns, \
"'sentiment' column does not exist. Please run classify_sentiment_of_tweets first."
df_sentiment = self.df.copy()
df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']),
axis=1)
self.df = df_sentiment
return self.df
def classify_topic(self, text: str):
"""
Classifies the topics of a text.
:param text: string of the tweet text.
"""
assert isinstance(text, str)
prompt_string = "Classify this tweet with a general topic and two sub-topics:\n\""
prompt_string += text
prompt_string += "\".\nGeneral topic: \nSub topic 1: \nSub topic 2:\n. The classifications should not be " \
"more than 5 words. Numerate each topic in the output. END "
response = openai.Completion.create(
model="text-davinci-002",
prompt=prompt_string,
temperature=0,
max_tokens=892,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_topic_results(classification_unclean)
return classification_clean.lower()
def classify_topics_of_tweets(self):
"""
Classifies the topics of a user's tweets.
"""
df_topic = self.df.copy()
df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic)
self.df = df_topic
return self.df
@staticmethod
def cleanup_topic_results(text):
new_item = text.replace("\n", " ")
new_item = new_item.replace(" ", " ")
return new_item
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates.
:param filename:
:return:
"""
if not os.path.exists(filename):
self.df.to_csv(filename, index=False)
else:
self.df.to_csv(filename, mode='a', header=False, index=False)
self.remove_duplicates_from_csv(filename)
@staticmethod
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes duplicates from csv file.
:param filename: filename of csv file
:return: None
"""
with open(filename, 'r') as f:
lines = f.readlines()
with open(filename, 'w') as f:
for line in lines:
if line not in lines[lines.index(line) + 1:]:
f.write(line)
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes tweets that have already been classified.
:param filename: filename of csv file
:return: None
"""
df = self.df
df = df[df['sentiment'].isnull()]
self.df = df
self.df_to_csv(filename)
def get_tweet_by_id(self, id, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Returns tweet by id.
:param id: id of tweet
:return: tweet
"""
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Classifies the topics/sentiments of a user's tweets.
#We presume that all tweets inside the twitterdata.csv file are already classified.
:return: None
"""
# Check if file exists, if not, create it
if os.path.exists(filename):
# Fetch tweets from csv file
already_classified_df = pd.read_csv(filename, on_bad_lines='skip')
print("Already classified tweets: {}".format(already_classified_df.shape[0]))
# Create a temporary df where values from already_classified_df that are not it self.df are stored
temp_df = self.df[self.df['id'].isin(already_classified_df['id'])]
# Remove rows from self.df that are not in already_classified_df
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])]
# Only classify non-empty rows
if self.df.shape[0] > 0:
print("Classifying topic of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_topics_of_tweets()
print("Classifying sentiment of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_sentiment_of_tweets()
print("Waiting for 1 minute... before analyzing targets...")
time.sleep(65)
self.df = self.analyze_sentiment_of_tweets()
print("Writing to csv...")
self.df_to_csv(filename)
# Concatenate temp_df and self.df
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("Appended {}.".format(filename))
return None
else:
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("No new tweets to classify.")
return None
else:
print("No csv file found. Continuing without removing already classified tweets.")
print("Classifying topics...")
self.df = self.classify_topics_of_tweets()
print("Classifying sentiments...")
self.df = self.classify_sentiment_of_tweets()
print("Waiting for 1 minute... before analyzing targets...")
time.sleep(65)
self.df = self.analyze_sentiment_of_tweets()
print("Writing to csv file...")
self.df_to_csv(filename)
print("Created {}.".format(filename))
return None
def __repr__(self):
"""
Gives a string that describes which user is classified
:return:
"""
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."
if __name__ == "__main__":
tc = TextClassifier(from_date="2020-03-01", to_date="2020-03-31", user_name='jimmieakesson', num_tweets=20)
tc.run_main_pipeline()