politweet / textclassifier /TextClassifier.py
Demea9000's picture
removed unnecessary functions
e71b625
raw
history blame
9.63 kB
import os
import time
import warnings
from datetime import date
import openai
import pandas as pd
import regex as re
from dotenv import find_dotenv, load_dotenv
from pandas.core.common import SettingWithCopyWarning
from twitterscraper import TwitterScraper
from functions import functions as f
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
dotenv_path = find_dotenv()
load_dotenv(dotenv_path)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class TextClassifier:
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),
user_name='jimmieakesson',
num_tweets=20, ):
"""
Initializes the TextClassifier.
:param model_name: name of the model from openai.
:param from_date: string of the format 'YYYY-MM-DD'.
:param to_date: string of the format 'YYYY-MM-DD'.
:param num_tweets: integer value of the maximum number of tweets to be scraped.
"""
# Make sure user_name is not empty
assert user_name is not None, "user_name cannot be empty"
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
self.model_name = model_name
self.from_date = from_date
self.to_date = to_date
self.num_tweets = num_tweets
self.user_name = user_name
# Assure that scrape_by_user actually gets num_tweets
# add timer in time-loop and stop after 10 seconds
start_time = time.time()
while True:
self.df = self.ts.scrape_by_user(user_name)
if 0 < len(self.df) <= num_tweets:
break
else:
if time.time() - start_time > 10:
raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.")
continue
# Make id as type int64
self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x))
# self.api_key = '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'
openai.api_key = OPENAI_API_KEY
@staticmethod
def cleanup_sentiment_results(classification_unclean):
"""
Cleans up the results of the sentiment classification.
:param classification_unclean: string of the classification result.
:return: cleaned up string.
"""
classification_clean = classification_unclean.replace('\n\n', "")
classification_clean = classification_clean.replace('\n', "")
if classification_clean.startswith(" "):
classification_clean = classification_clean.replace(" ", "")
return classification_clean
def classify_topics_of_tweets(self):
"""
Classifies the topics of a user's tweets.
"""
df_topic = self.df.copy()
df_topic['topics'] = df_topic['tweet'].apply(self.classify_topic)
self.df = df_topic
self.split_topics_into_columns()
return self.df
def classify_all(self, tweet: str):
"""
Classifies the topic, subtopic, sentiment and target of a user's tweets.
"""
import os
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
promptstring = "Decide a Tweet's political TOPIC and SUBTOPIC, without classifying it as 'politics'. Also " \
"decide whether a political Tweet's " \
"SENTIMENT is " \
"positive, " \
"negative or neutral. Also give the TARGET of the sentiment. \nGive the answer in the form ' (" \
"TOPIC, SUBTOPIC, SENTIMENT, TARGET)'\n\nTweet: {} \nAnswer: ".format(tweet)
response = openai.Completion.create(
model="text-davinci-002",
prompt=promptstring,
temperature=0,
max_tokens=30,
top_p=1,
frequency_penalty=0.5,
presence_penalty=0
)
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_topic_results(classification_unclean)
return classification_clean.lower()
def classify_all_list(self):
"""
Classifies the topics of a user's tweets.
"""
df_topic = self.df.copy()
df_topic['class_tuple'] = df_topic['tweet'].apply(self.classify_all)
self.df = df_topic
self.split_tuple_into_columns()
return self.df
@staticmethod
def cleanup_topic_results(text):
new_item = text.replace("\n", " ")
new_item = new_item.replace(" ", " ")
return new_item
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates.
:param filename:
:return:
"""
if not os.path.exists(filename):
self.df.to_csv(filename, index=False)
else:
self.df.to_csv(filename, mode='a', header=False, index=False)
self.remove_duplicates_from_csv(filename)
@staticmethod
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes duplicates from csv file.
:param filename: filename of csv file
:return: None
"""
with open(filename, 'r') as f:
lines = f.readlines()
with open(filename, 'w') as f:
for line in lines:
if line not in lines[lines.index(line) + 1:]:
f.write(line)
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes tweets that have already been classified.
:param filename: filename of csv file
:return: None
"""
df = self.df
df = df[df['sentiment'].isnull()]
self.df = df
self.df_to_csv(filename)
def split_tuple_into_columns(self):
"""
Splits the topics (topic, subtopic, sentiment, target) into columns.
:return: None
"""
df_topic = self.df.copy()
df_topic['topics_temp'] = df_topic['class_tuple'].apply(f.convert_to_tuple)
df_topic_split = pd.DataFrame(df_topic['topics_temp'].tolist(),
columns=['main_topic', 'sub_topic', 'sentiment', 'target'])
self.df = df_topic.merge(df_topic_split, how='left', left_index=True, right_index=True)
self.df.drop(['topics_temp'], axis=1, inplace=True)
# Remove '(' and ')' from main_topic
self.df['main_topic'] = self.df['main_topic'].apply(lambda x: x.replace("(", ""))
self.df['main_topic'] = self.df['main_topic'].apply(lambda x: x.replace(")", ""))
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Classifies the topics/sentiments of a user's tweets.
#We presume that all tweets inside the twitterdata.csv file are already classified.
:return: None
"""
# Check if file exists, if not, create it
if os.path.exists(filename):
# Fetch tweets from csv file
already_classified_df = pd.read_csv(filename, on_bad_lines='skip')
print("Already classified tweets: {}".format(already_classified_df.shape[0]))
# Create a temporary df where values from already_classified_df that are not it self.df are stored
temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])]
# Remove rows from self.df that are not in already_classified_df
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])]
# Only classify non-empty rows
if self.df.shape[0] > 0:
print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_all_list()
print("Writing to csv...")
self.df_to_csv(filename)
# Concatenate temp_df and self.df
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("Appended {}.".format(filename))
return None
else:
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("No new tweets to classify.")
return None
else:
print("No csv file found. Continuing without removing already classified tweets.")
print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_all_list()
print("Writing to csv file...")
self.df_to_csv(filename)
print("Created {}.".format(filename))
return None
def get_dataframe(self):
"""
Returns the dataframe.
:return: dataframe
"""
return self.df
def __repr__(self):
"""
Gives a string that describes which user is classified
:return:
"""
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."
if __name__ == "__main__":
text_classifier = TextClassifier(from_date="2022-07-01", to_date="2022-07-15", user_name='jimmieakesson',
num_tweets=20)
text_classifier.run_main_pipeline()