import os import time import warnings from datetime import date import openai import pandas as pd import regex as re from dotenv import find_dotenv, load_dotenv from pandas.core.common import SettingWithCopyWarning from twitterscraper import TwitterScraper from functions import functions as f warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) # Set one directory up into ROOT_PATH ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) dotenv_path = find_dotenv() load_dotenv(dotenv_path) OPENAI_AUTHTOKEN = os.environ.get("OPENAI_AUTHTOKEN") class TextClassifier: def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()), user_name='jimmieakesson', num_tweets=20, ): """ Initializes the TextClassifier. :param model_name: name of the model from openai. :param from_date: string of the format 'YYYY-MM-DD'. :param to_date: string of the format 'YYYY-MM-DD'. :param num_tweets: integer value of the maximum number of tweets to be scraped. """ # Make sure user_name is not empty assert user_name is not None, "user_name cannot be empty" self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets) self.model_name = model_name self.from_date = from_date self.to_date = to_date self.num_tweets = num_tweets self.user_name = user_name # Assure that scrape_by_user actually gets num_tweets # add timer in time-loop and stop after 10 seconds start_time = time.time() while True: self.df = self.ts.scrape_by_user(user_name) if 0 < len(self.df) <= num_tweets: break else: if time.time() - start_time > 10: raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.") continue # Make id as type int64 self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x)) # self.api_key = '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' openai.api_key = OPENAI_AUTHTOKEN @staticmethod def cleanup_sentiment_results(classification_unclean): """ Cleans up the results of the sentiment classification. :param classification_unclean: string of the classification result. :return: cleaned up string. """ classification_clean = classification_unclean.replace('\n\n', "") classification_clean = classification_clean.replace('\n', "") if classification_clean.startswith(" "): classification_clean = classification_clean.replace(" ", "") return classification_clean def classify_sentiment(self, text: str): """ Classifies the sentiment of a text. """ assert isinstance(text, str) prompt_string = "Classify one sentiment for this tweet:\n \"" prompt_string += text prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \ "\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \ "\nQuestions,\nStatements,\nOpinions,\nPredictions.\nSENTIMENT=" response = openai.Completion.create( model=self.model_name, prompt=prompt_string, temperature=0.0, max_tokens=256, top_p=1, frequency_penalty=0, presence_penalty=0, logprobs=5 ) classification_unclean = response.choices[0]['text'] classification_clean = self.cleanup_sentiment_results(classification_unclean) return classification_clean.lower() def classify_sentiment_of_tweets(self): """ Classifies the sentiment of a user's tweets. """ df_sentiment = self.df.copy() df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment) self.df = df_sentiment return self.df def analyze_sentiment(self, text: str, sentiment: str): """ Analyzes the sentiment of a text using OpenAI. :param text: string of the tweet text. :param sentiment: string of the sentiment. :return: """ # assert 1 == 2, "Måste fixa prompt innan denna metod körs" prompt_string = "Who is the TARGET of this " prompt_string += sentiment prompt_string += " TWEET?\\nTWEET=\"" prompt_string += text prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET=" response = openai.Completion.create( model=self.model_name, prompt=prompt_string, temperature=0, max_tokens=256, top_p=1, frequency_penalty=0, presence_penalty=0 ) analyzed_sentiment = response.choices[0]['text'] # Remove spaces at the start/end of the response if analyzed_sentiment.startswith(' '): analyzed_sentiment = analyzed_sentiment[1:] if analyzed_sentiment.endswith(' '): analyzed_sentiment = analyzed_sentiment[:-1] # Sometimes GPT-3 gives faulty results, so a simple filter is introduced # If the prediction is bad # -> set target value to N/A (not applicable) if len(analyzed_sentiment) > 50: analyzed_sentiment = "N/A" # An attempt to merge target responses that should be the same analyzed_sentiment = re.sub("\(", "", analyzed_sentiment) analyzed_sentiment = re.sub("\)", "", analyzed_sentiment) s_list = ["s", "the swedish social democratic party"] m_list = ["m", "the swedish moderate party", "the moderate party"] mp_list = ["mp", "the swedish green party"] if analyzed_sentiment.lower() == "v": analyzed_sentiment = "Vänsterpartiet" elif analyzed_sentiment.lower() == "mp": analyzed_sentiment = "Miljöpartiet" elif analyzed_sentiment.lower() in s_list: analyzed_sentiment = "Socialdemokraterna" elif analyzed_sentiment.lower() == "c": analyzed_sentiment = "Centerpartiet" elif analyzed_sentiment.lower() == "l": analyzed_sentiment = "Liberalerna" elif analyzed_sentiment.lower() == "kd": analyzed_sentiment = "Kristdemokraterna" elif analyzed_sentiment.lower() in m_list: analyzed_sentiment = "Moderaterna" elif analyzed_sentiment.lower() == "sd": analyzed_sentiment = "Sverigedemokraterna" elif analyzed_sentiment.lower() == "the swedish government": analyzed_sentiment = "Regeringen" analyzed_sentiment = self.cleanup_sentiment_results(analyzed_sentiment) return analyzed_sentiment def analyze_sentiment_of_tweets(self): """ Analyzes the sentiment of a user's tweets. """ # check if 'sentiment' column exists, raise exception if not assert 'sentiment' in self.df.columns, \ "'sentiment' column does not exist. Please run classify_sentiment_of_tweets first." df_sentiment = self.df.copy() df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']), axis=1) self.df = df_sentiment return self.df def classify_topic(self, text: str): """ Classifies the topics of a text. :param text: string of the tweet text. """ assert isinstance(text, str) prompt_string = "Classify this tweet with a general topic and two sub-topics:\n\"" prompt_string += text prompt_string += "\".\nGeneral topic: \nSub topic 1: \nSub topic 2:\n. The classifications should not be " \ "more than 5 words. Numerate each topic in the output. END " response = openai.Completion.create( model="text-davinci-002", prompt=prompt_string, temperature=0, max_tokens=892, top_p=1, frequency_penalty=0, presence_penalty=0 ) classification_unclean = response.choices[0]['text'] classification_clean = self.cleanup_topic_results(classification_unclean) return classification_clean.lower() def classify_topics_of_tweets(self): """ Classifies the topics of a user's tweets. """ df_topic = self.df.copy() df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic) self.df = df_topic return self.df @staticmethod def cleanup_topic_results(text): new_item = text.replace("\n", " ") new_item = new_item.replace(" ", " ") return new_item def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates. :param filename: :return: """ if not os.path.exists(filename): self.df.to_csv(filename, index=False) else: self.df.to_csv(filename, mode='a', header=False, index=False) self.remove_duplicates_from_csv(filename) @staticmethod def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Removes duplicates from csv file. :param filename: filename of csv file :return: None """ with open(filename, 'r') as f: lines = f.readlines() with open(filename, 'w') as f: for line in lines: if line not in lines[lines.index(line) + 1:]: f.write(line) def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Removes tweets that have already been classified. :param filename: filename of csv file :return: None """ df = self.df df = df[df['sentiment'].isnull()] self.df = df self.df_to_csv(filename) def split_topics_into_columns(self): """ Splits the topics into columns. :return: None """ df_topic = self.df.copy() df_topic['topic_temp'] = df_topic['topic'].apply(lambda x: f.separate_string(x)) df_topic_split = pd.DataFrame(df_topic['topic_temp'].tolist(), columns=['main_topic', 'sub_topic_1', 'sub_topic_2']) self.df = df_topic.merge(df_topic_split, how='left', left_index=True, right_index=True) self.df.drop(['topic_temp'], axis=1, inplace=True) def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Classifies the topics/sentiments of a user's tweets. #We presume that all tweets inside the twitterdata.csv file are already classified. :return: None """ # Check if file exists, if not, create it if os.path.exists(filename): # Fetch tweets from csv file already_classified_df = pd.read_csv(filename, on_bad_lines='skip') print("Already classified tweets: {}".format(already_classified_df.shape[0])) # Create a temporary df where values from already_classified_df that are not it self.df are stored temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])] # Remove rows from self.df that are not in already_classified_df self.df = self.df[~self.df['id'].isin(already_classified_df['id'])] # Only classify non-empty rows if self.df.shape[0] > 0: print("Classifying topic of {} tweets...".format(self.df.shape[0])) self.df = self.classify_topics_of_tweets() print("Classifying sentiment of {} tweets...".format(self.df.shape[0])) self.df = self.classify_sentiment_of_tweets() print("Waiting for 1 minute... before analyzing targets...") time.sleep(65) self.df = self.analyze_sentiment_of_tweets() print("Writing to csv...") self.df_to_csv(filename) # Concatenate temp_df and self.df self.df = pd.concat([temp_df, self.df], ignore_index=True) print("Appended {}.".format(filename)) return None else: self.df = pd.concat([temp_df, self.df], ignore_index=True) print("No new tweets to classify.") return None else: print("No csv file found. Continuing without removing already classified tweets.") print("Classifying topics...") self.df = self.classify_topics_of_tweets() print("Classifying sentiments...") self.df = self.classify_sentiment_of_tweets() print("Waiting for 1 minute... before analyzing targets...") time.sleep(65) self.df = self.analyze_sentiment_of_tweets() print("Writing to csv file...") self.df_to_csv(filename) print("Created {}.".format(filename)) return None def get_dataframe(self): """ Returns the dataframe. :return: dataframe """ return self.df def __repr__(self): """ Gives a string that describes which user is classified :return: """ return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "." if __name__ == "__main__": text_classifier = TextClassifier(from_date="2020-01-01", to_date="2020-01-31", user_name='jimmieakesson', num_tweets=20) text_classifier.run_main_pipeline()