import os import time import warnings from datetime import date import openai import pandas as pd import regex as re from dotenv import find_dotenv, load_dotenv from pandas.core.common import SettingWithCopyWarning from twitterscraper import TwitterScraper from functions import functions as f warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) # Set one directory up into ROOT_PATH ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) dotenv_path = find_dotenv() load_dotenv(dotenv_path) OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") class TextClassifier: def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()), user_name='jimmieakesson', num_tweets=20, ): """ Initializes the TextClassifier. :param model_name: name of the model from openai. :param from_date: string of the format 'YYYY-MM-DD'. :param to_date: string of the format 'YYYY-MM-DD'. :param num_tweets: integer value of the maximum number of tweets to be scraped. """ # Make sure user_name is not empty assert user_name is not None, "user_name cannot be empty" self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets) self.model_name = model_name self.from_date = from_date self.to_date = to_date self.num_tweets = num_tweets self.user_name = user_name # Assure that scrape_by_user actually gets num_tweets # add timer in time-loop and stop after 10 seconds start_time = time.time() while True: self.df = self.ts.scrape_by_user(user_name) if num_tweets-5 < len(self.df) <= num_tweets: break else: if time.time() - start_time > 15: raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.") continue # Make id as type int64 self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x)) # self.api_key = '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' openai.api_key = OPENAI_API_KEY def classify_all(self, tweet: str): """ Classifies the topic, subtopic, sentiment and target of a user's tweets. """ import os import openai openai.api_key = os.getenv("OPENAI_API_KEY") promptstring = "Decide a Tweet's political TOPIC and SUBTOPIC, without classifying it as 'politics'. Also " \ "decide whether a political Tweet's " \ "SENTIMENT is " \ "positive, " \ "negative or neutral. Also give the TARGET of the sentiment. \nGive the answer in the form ' (" \ "TOPIC, SUBTOPIC, SENTIMENT, TARGET)'\n\nTweet: {} \nAnswer: ".format(tweet) response = openai.Completion.create( model="text-davinci-002", prompt=promptstring, temperature=0, max_tokens=30, top_p=1, frequency_penalty=0.5, presence_penalty=0 ) classification_unclean = response.choices[0]['text'] classification_clean = self.cleanup_topic_results(classification_unclean) return classification_clean.lower() def classify_all_list(self): """ Classifies the topics of a user's tweets. """ df_topic = self.df.copy() df_topic['class_tuple'] = df_topic['tweet'].apply(self.classify_all) self.df = df_topic self.split_tuple_into_columns() return self.df @staticmethod def cleanup_topic_results(text): new_item = text.strip() new_item = new_item.replace("\n", "") new_item = new_item.replace(" ", "") return new_item def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates. :param filename: :return: """ if not os.path.exists(filename): self.df.to_csv(filename, index=False) else: self.df.to_csv(filename, mode='a', header=False, index=False) self.remove_duplicates_from_csv(filename) @staticmethod def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Removes duplicates from csv file. :param filename: filename of csv file :return: None """ with open(filename, 'r') as f: lines = f.readlines() with open(filename, 'w') as f: for line in lines: if line not in lines[lines.index(line) + 1:]: f.write(line) def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Removes tweets that have already been classified. :param filename: filename of csv file :return: None """ df = self.df df = df[df['sentiment'].isnull()] self.df = df self.df_to_csv(filename) def split_tuple_into_columns(self): """ Splits the topics (topic, subtopic, sentiment, target) into columns. :return: None """ df_topic = self.df.copy() df_topic['topics_temp'] = df_topic['class_tuple'].apply(f.convert_to_tuple) df_topic_split = pd.DataFrame(df_topic['topics_temp'].tolist(), columns=['main_topic', 'sub_topic', 'sentiment', 'target']) # Manually add columns to self.df self.df['main_topic'] = df_topic_split['main_topic'].astype(str) self.df['sub_topic'] = df_topic_split['sub_topic'].astype(str) self.df['sentiment'] = df_topic_split['sentiment'].astype(str) self.df['target'] = df_topic_split['target'].astype(str) def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Classifies the topics/sentiments of a user's tweets. #We presume that all tweets inside the twitterdata.csv file are already classified. :return: None """ # Check if file exists, if not, create it if os.path.exists(filename): # Fetch tweets from csv file already_classified_df = pd.read_csv(filename, on_bad_lines='skip') print("Already classified tweets: {}".format(already_classified_df.shape[0])) # Create a temporary df where values from already_classified_df that are not it self.df are stored temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])] # Remove rows from self.df that are not in already_classified_df self.df = self.df[~self.df['id'].isin(already_classified_df['id'])] # Only classify non-empty rows if self.df.shape[0] > 0: print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0])) self.df = self.classify_all_list() print("Writing to csv...") self.df_to_csv(filename) # Concatenate temp_df and self.df self.df = pd.concat([temp_df, self.df], ignore_index=True) print("Appended {}.".format(filename)) return None else: self.df = pd.concat([temp_df, self.df], ignore_index=True) print("No new tweets to classify.") return None else: print("No csv file found. Continuing without removing already classified tweets.") print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0])) self.df = self.classify_all_list() print("Writing to csv file...") self.df_to_csv(filename) print("Created {}.".format(filename)) return None def get_dataframe(self): """ Returns the dataframe. :return: dataframe """ return self.df def __repr__(self): """ Gives a string that describes which user is classified :return: """ return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "." if __name__ == "__main__": text_classifier = TextClassifier(from_date='2019-01-01', to_date="2022-07-15", user_name='jimmieakesson', num_tweets=60) text_classifier.run_main_pipeline()