import os import time import warnings import openai import pandas as pd from dotenv import find_dotenv, load_dotenv from pandas.core.common import SettingWithCopyWarning from twitterscraper import TwitterScraper from sentence_transformers import SentenceTransformer from scipy import spatial from datetime import date, timedelta warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) # Set one directory up into ROOT_PATH ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) dotenv_path = find_dotenv() load_dotenv(dotenv_path) OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") class TextClassifier: def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()), user_list=['jimmieakesson'], num_tweets=20): """ Initializes the TextClassifier. :param model_name: name of the model from openai. :param from_date: string of the format 'YYYY-MM-DD'. :param to_date: string of the format 'YYYY-MM-DD'. :param num_tweets: integer value of the maximum number of tweets to be scraped. """ # Make sure user_name is not empty assert user_list is not None, "user_name cannot be empty" self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets) self.model_name = model_name self.from_date = from_date self.to_date = to_date self.num_tweets = num_tweets self.user_name = user_list # Assure that scrape_by_user actually gets num_tweets # add timer in time-loop and stop after 10 seconds # self.df = self.ts.scrape_by_user(user_name) self.df = self.ts.scrape_by_several_users(user_list) # Check if 'id' is in self.df if 'id' in self.df.columns: # Make id as type int64 self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x)) else: # If not do nothing pass openai.api_key = OPENAI_API_KEY def classify_all(self, tweet: str): """ Classifies the topic, subtopic, sentiment and target of a user's tweets. """ import os import openai valid_tweet = len(tweet.split()) > 4 if valid_tweet: openai.api_key = os.getenv("OPENAI_API_KEY") promptstring = "Decide a Tweet's political TOPIC and SUBTOPIC, without classifying it as 'politics'. Also " \ "decide whether a political Tweet's " \ "SENTIMENT is " \ "positive, " \ "negative or neutral. Also give the TARGET of the sentiment. \nGive the answer in the form ' (" \ "TOPIC, SUBTOPIC, SENTIMENT, TARGET)'\n\nTweet: {} \nAnswer: ".format(tweet) response = openai.Completion.create( model="text-davinci-002", prompt=promptstring, temperature=0, max_tokens=30, top_p=1, frequency_penalty=0.5, presence_penalty=0 ) classification_unclean = response.choices[0]['text'] classification_clean = self.cleanup_topic_results(classification_unclean) if classification_clean.lower() == "(topic, subtopic, sentiment, target)": classification_clean = "(none, none, none, none)" else: classification_clean = "(none, none, none, none)" return classification_clean.lower() def classify_all_list(self): """ Classifies the topics of a user's tweets. """ df_topic = self.df.copy() df_topic['class_tuple'] = df_topic['tweet'].apply(self.classify_all) self.df = df_topic self.split_tuple_into_columns() return self.df @staticmethod def cleanup_topic_results(text): """ Cleanup response from GPT-3 to a string matching the format: "(main_topic, sub_topic, sentiment, target)" :param text: GPT-3 response :return: A string on the format: "(main_topic, sub_topic, sentiment, target)" """ new_item = text.strip() new_item = new_item.replace("\n", "") new_item = new_item.replace(" ", "") item_control = new_item.replace("(", "") item_control = item_control.replace(")", "") item_control = item_control.split(",") if ' ' or '' in item_control: item_control = [s.strip() if not (s == ' ' or s == '') else 'none' for s in item_control] # Replace empty classifications with 'none' diff = 4 - len(item_control) if diff < 0: # If response gave more than four predictions cutout = item_control[diff - 1:] # Cut out the superflous predictions item_control = item_control[:diff - 1] # Save the rest new_s = "" for i in range(len(cutout)): new_s += cutout[i] if i < -diff: new_s += " and " # Merge superflous predictions. E.g. target = 's', 'mp', 'v' -> target = 's and mp and v' item_control.append(new_s) elif diff > 0: # If response gave less than four predictions for i in range(diff): item_control.append("none") # Fill out tuple with nones new_item = str(tuple(item_control)) new_item = new_item.replace("'", "") return new_item def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates. :param filename: :return: """ if not os.path.exists(filename): self.df.to_csv(filename, index=False) else: self.df.to_csv(filename, mode='a', header=False, index=False) self.remove_duplicates_from_csv(filename) @staticmethod def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Removes duplicates from csv file. :param filename: filename of csv file :return: None """ with open(filename, 'r', encoding="utf8") as f: lines = f.readlines() with open(filename, 'w', encoding="utf8") as f: for line in lines: if line not in lines[lines.index(line) + 1:]: f.write(line) def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Removes tweets that have already been classified. :param filename: filename of csv file :return: None """ df = self.df df = df[df['sentiment'].isnull()] self.df = df self.df_to_csv(filename) def split_tuple_into_columns(self): """ Splits the topics (topic, subtopic, sentiment, target) into columns. :return: None """ df_topic = self.df.copy() df_topic['topics_temp'] = df_topic['class_tuple'].apply(lambda x: tuple(x[1:-1].split(","))) df_topic_split = pd.DataFrame(df_topic['topics_temp'].tolist(), columns=['main_topic', 'sub_topic', 'sentiment', 'target']) # Manually add columns to self.df self.df['main_topic'] = df_topic_split['main_topic'].tolist() self.df['main_topic'] = self.df['main_topic'].replace(["n/a", "", " "], "none", regex=True) self.df['main_topic'] = self.df['main_topic'].apply( lambda x: x.strip() if not (len(x) == 1 and x == "-") else "none") self.df['sub_topic'] = df_topic_split['sub_topic'].tolist() # In a few of the outputs from GPT-3 the sub_topic = "sentiment" self.df['sub_topic'] = self.df['sub_topic'].replace(["n/a", "sentiment", "", " "], "none", regex=True) self.df['sub_topic'] = self.df['sub_topic'].apply( lambda x: x.strip() if not (len(x) == 1 and x == "-") else "none") self.df['sentiment'] = df_topic_split['sentiment'].tolist() self.df['sentiment'] = self.df['sentiment'].replace(["n/a", "sentiment", "", " "], "none", regex=True) self.df['sentiment'] = self.df['sentiment'].apply( lambda x: x.strip() if not (len(x) == 1 and x == "-") else "none") self.df['target'] = df_topic_split['target'].tolist() self.df['target'] = self.df['target'].replace(["n/a", "", " "], "none", regex=True) self.df['target'] = self.df['target'].apply(lambda x: x.strip() if not (len(x) == 1 and x == "-") else "none") self.df.fillna('none', inplace=True) def get_dataframe(self): """ Returns the dataframe. :return: dataframe """ return self.df def __repr__(self): """ Gives a string that describes which user is classified :return: """ return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "." def get_database(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Returns the database containing all dataframes. :param filename: filename of csv file :return: """ db = pd.read_csv(filename) return db def cleanup_list(self, uncleaned_list): """ Cleans up faulty predictions. :param uncleaned_list: the list to be cleaned :return: cleaned list """ uncleaned_list = [s if not isinstance(s, float) else "none" for s in uncleaned_list] uncleaned_list = [s if not len(s.split()) > 5 else "none" for s in uncleaned_list] uncleaned_list = [s if not "swedish" in s else s.replace("swedish", " ") for s in uncleaned_list] uncleaned_list = [s if not "politics" in s else s.replace("politics", "none") for s in uncleaned_list] uncleaned_list = [s.replace(" ", " ") for s in uncleaned_list] cleaned_list = [s.strip() for s in uncleaned_list] return cleaned_list def merge_lists(self, main_topic_list, sub_topic_list): """ Merges the topic lists. If either topic is a faulty classification, only the non-faulty topic wil be used. If both are faulty, the merged topic will be labeled as faulty (ERROR_496). :param main_topic_list: A list containing main topics :param sub_topic_list: A list containing sub topics :return: A list containing string items on the form "main_topic and sub_topic" """ new_list = [] main_topic_list = self.clean_party_names(main_topic_list) sub_topic_list = self.clean_party_names(sub_topic_list) for i in range(len(main_topic_list)): if main_topic_list[i].lower() == "none" and sub_topic_list[ i].lower() == "none": # If the predictions are faulty new_list.append("ERROR_496") # Label as ERROR_496 (faulty prediction) elif main_topic_list[i].lower() == "none": new_list.append(sub_topic_list[i]) elif sub_topic_list[i].lower() == "none": new_list.append(main_topic_list[i]) else: new_list.append(main_topic_list[i] + " and " + sub_topic_list[i]) return new_list def file_to_mat(self, classification_type): """ Converts a synonym textfile to a matrix in which the rows contain a general topic/target and its related words. :param classification_type: The type of classification: topic or target :return: a matrix in which the first element of each row is a general topic/target, and the rest are words related to the topic """ filename = "{}/data/".format(ROOT_PATH) filename += classification_type + "_synonyms.txt" with open(filename, encoding='utf-8') as f: lines = f.read() lines = lines.split("\n") topic_list = [] temp_list = [] for topic in lines: if not topic.endswith("####"): temp_list.append(topic) else: temp_list.append(topic[:-4]) # Remove the marker (####) topic_list.append(temp_list) temp_list = [] return topic_list def mat_to_list(self, mat): """ Converts a matrix from file_to_mat() into one list containing all topics and synonyms, and one list with mappings for the synonyms. :param mat: a matrix from file_to_mat() :return: """ full_list = [] mapped_synonyms = [] for syns in mat: for topic in syns: full_list.append(topic) mapped_synonyms.append(syns[0]) return full_list, mapped_synonyms def clean_party_names(self, old_topic_list): """ Encodes all party names to sentences that will yield a high cosine similarity value when merged with another topic, without taking the actual party name into account. These sentences have deliberately been composed such that they pose a low risk of being close (in the sentence embedding-space) to any possible merged topic or target that may be encountered. :param old_topic_list: list of topics :return: list of encoded topics """ # Problem 1: When a party name is encountered, we want to bias the merging towards that party since the # occurrence of a very general main topic (as in the example below) plus a party name as subtopic is frequent. # Example: main_topic = "politics", sub_topic = "sweden democrats" -> # combined_topics = "politics and sweden democrats" # Problem 2: The party names themselves are biased towards certain topics/targets and lead to faulty merges. # Example: Variations of words such as "Sweden" as the target/topic will be biased towards getting merged with # "Sweden Democrats". # Solution: Encode party names with sentences that are HIGHLY unlikely to be close to anything in the embedding # space and thus enforcing a strong bias in the cosine similarity computation towards the party if encountered. party_names = {} party_names["m"] = "parrot computer is swimming as screen time" party_names["moderaterna"] = "parrot computer is swimming as screen time" party_names["moderates"] = "parrot computer is swimming as screen time" party_names["the moderates"] = "parrot computer is swimming as screen time" party_names["moderate party"] = "parrot computer is swimming as screen time" party_names["the moderate party"] = "parrot computer is swimming as screen time" party_names["the moderaterna party"] = "parrot computer is swimming as screen time" party_names["sd"] = "keyboard can hire the yellow elephant in cosmos" party_names["sverigedemokraterna"] = "keyboard can hire the yellow elephant in cosmos" party_names["sweden democrats"] = "keyboard can hire the yellow elephant in cosmos" party_names["the sweden democrats"] = "keyboard can hire the yellow elephant in cosmos" party_names["the swedish democrats"] = "keyboard can hire the yellow elephant in cosmos" party_names["swedish democrats"] = "keyboard can hire the yellow elephant in cosmos" party_names["@jimmieakesson"] = "keyboard can hire the yellow elephant in cosmos" party_names["l"] = "red weather jokes with music and the mathematician" party_names["liberalerna"] = "red weather jokes with music and the mathematician" party_names["liberals"] = "red weather jokes with music and the mathematician" party_names["the liberals"] = "red weather jokes with music and the mathematician" party_names["the liberal party"] = "red weather jokes with music and the mathematician" party_names["liberal people's party"] = "red weather jokes with music and the mathematician" party_names["@johanpehrson"] = "red weather jokes with music and the mathematician" party_names["mp"] = "ice piano flies with pencil as direction" party_names["miljöpartiet"] = "ice piano flies with pencil as direction" party_names["de gröna"] = "ice piano flies with pencil as direction" party_names["green party"] = "ice piano flies with pencil as direction" party_names["the green party"] = "ice piano flies with pencil as direction" party_names["miljopartiet"] = "ice piano flies with pencil as direction" party_names["@bolund"] = "ice piano flies with pencil as direction" party_names["@martastenevi"] = "ice piano flies with pencil as direction" party_names["s"] = "lamp of fire walks bird gladly tomorrow" party_names["socialdemokraterna"] = "lamp of fire walks bird gladly tomorrow" party_names["social democratic party"] = "lamp of fire walks bird gladly tomorrow" party_names["the social democratic party"] = "lamp of fire walks bird gladly tomorrow" party_names["social democrats"] = "lamp of fire walks bird gladly tomorrow" party_names["the social democrats"] = "lamp of fire walks bird gladly tomorrow" party_names["sosse"] = "lamp of fire walks bird gladly tomorrow" party_names["sossen"] = "lamp of fire walks bird gladly tomorrow" party_names["sossar"] = "lamp of fire walks bird gladly tomorrow" party_names["sossarna"] = "lamp of fire walks bird gladly tomorrow" party_names["sossarnas"] = "lamp of fire walks bird gladly tomorrow" party_names["swedish social democrats"] = "lamp of fire walks bird gladly tomorrow" party_names["@swedishpm"] = "lamp of fire walks bird gladly tomorrow" party_names["v"] = "rooftop cats play physics with cardboard fire" party_names["vänsterpartiet"] = "rooftop cats play physics with cardboard fire" party_names["left party"] = "rooftop cats play physics with cardboard fire" party_names["the left party"] = "rooftop cats play physics with cardboard fire" party_names["@dadgostarnooshi"] = "rooftop cats play physics with cardboard fire" party_names["c"] = "differential donuts program sunny waters" party_names["centerpartiet"] = "differential donuts program sunny waters" party_names["center party"] = "differential donuts program sunny waters" party_names["centre party"] = "differential donuts program sunny waters" party_names["the center party"] = "differential donuts program sunny waters" party_names["@annieloof"] = "differential donuts program sunny waters" party_names["kd"] = "cauchy-riemann met sunglasses after rolling yellow" party_names["kristdemokraterna"] = "cauchy-riemann met sunglasses after rolling yellow" party_names["christian democrats"] = "cauchy-riemann met sunglasses after rolling yellow" party_names["the christian democrats"] = "cauchy-riemann met sunglasses after rolling yellow" party_names["@buschebba"] = "cauchy-riemann met sunglasses after rolling yellow" for i, topic in enumerate(old_topic_list): topic = topic.lower() topic = topic.replace(" ", " ") topic = topic.strip() if topic in party_names: old_topic_list[i] = party_names.get(topic) return old_topic_list def reset_party_names(self, old_topic_list): """ Decodes the encoded party names. :param old_topic_list: list of topics :return: list of encoded topics """ party_names = {} party_names["m"] = "parrot computer is swimming as screen time" party_names["sd"] = "keyboard can hire the yellow elephant in cosmos" party_names["l"] = "red weather jokes with music and the mathematician" party_names["mp"] = "ice piano flies with pencil as direction" party_names["s"] = "lamp of fire walks bird gladly tomorrow" party_names["v"] = "rooftop cats play physics with cardboard fire" party_names["c"] = "differential donuts program sunny waters" party_names["kd"] = "cauchy-riemann met sunglasses after rolling yellow" inverted_dict = {} # Invert dictionary for k, v in party_names.items(): if v not in inverted_dict: inverted_dict[v] = k # Update values in old_topic_list for i, topic in enumerate(old_topic_list): if topic in inverted_dict.keys(): old_topic_list[i] = inverted_dict.get(topic) return old_topic_list def merge_classifications(self, old_list, classification_type): """ Merges topics/targets from GPT-3 according to a list of predefined topics/targets. :param old_list: list of the topics/targets to be merged :param classification_type: type of classifications: topic or target :return: list of new topics/targets """ # Get the tuple of lists containing all synonyms and general topics/targets tup_list = self.mat_to_list(self.file_to_mat(classification_type)) # Save list of synonyms synonym_list = tup_list[0] # Save list of mappings between synonym and general topic/target synonym_mappings = tup_list[1] # Load embedding model-names model_list = ['sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2', 'all-MiniLM-L6-v2'] result_dict = {} # Encode party names old_list = self.clean_party_names(old_list) for model_name in model_list: model = SentenceTransformer(model_name) # Encode the topics/targets with the sentence transformer model old_list_embeddings = model.encode(old_list, batch_size=64, show_progress_bar=True, convert_to_tensor=True) # Encode the synonyms with the sentence transformer model synonym_list_embeddings = model.encode(synonym_list, batch_size=64, show_progress_bar=True, convert_to_tensor=True) for i, embedded_classification in enumerate(old_list_embeddings): result_list = [] for embedded_synonyms in synonym_list_embeddings: # Compute the cosine similarity between every classification and synonym result = 1 - spatial.distance.cosine(embedded_classification, embedded_synonyms) result_list.append(result) max_value = max(result_list) max_index = result_list.index(max_value) old_classification = old_list[i] # Extract the general topic/target new_classification = synonym_mappings[max_index] # Save the topic/target that yielded the highest cosine similarity value if old_classification not in result_dict: result_dict[old_classification] = [(new_classification, max_value, synonym_list[max_index])] # When we have found the best topics/targets after using the first transformer model else: # Append the results from the next model result_dict[old_classification].append((new_classification, max_value, synonym_list[max_index])) new_dict = {} # Time to replace the old values with the new ones for old_values in result_dict: tup_list = result_dict[old_values] max_tup = max(tup_list, key=lambda item: item[1]) if classification_type == "topic": limit = 0.4 else: limit = 0.75 # Discard classification if the old topic/target is not similar to anything in our synonym lists if max_tup[1] < limit: max_tup = ("ERROR_9000", "{:.2f}".format(round(max_tup[1], 2)), "none") else: max_tup = (max_tup[0], "{:.2f}".format(round(max_tup[1], 2)), max_tup[2]) new_classification = max_tup if old_values not in new_dict: new_dict[old_values] = new_classification new_list = [] for old_value in old_list: new_list.append(new_dict[old_value]) return new_list def merge_all(self): """ Merges main+subtopics, targets, and updates the dataframe. :param df: :return: """ df_topics = self.df.copy() sub_topics = df_topics['sub_topic'] sub_topics = sub_topics.tolist() sub_topics = self.cleanup_list(sub_topics) main_topics = df_topics['main_topic'] main_topics = main_topics.tolist() main_topics = self.cleanup_list(main_topics) merged_topic_list = self.merge_lists(main_topics, sub_topics) targets = df_topics['target'] targets = targets.tolist() targets = self.cleanup_list(targets) merged_topics = self.merge_classifications(merged_topic_list, "topic") merged_targets = self.merge_classifications(targets, "target") print("The following merges were made: ") for i, top in enumerate(merged_topic_list): print("TOPICS: ", top, " -> ", merged_topics[i]) t_list = [] for i in range(len(merged_topics)): t_list.append(tuple(merged_topics[i]) + tuple(merged_targets[i])) merged_tuples = t_list df_topics['merged_tuple'] = merged_tuples df = self.split_merged_tuple_into_columns(df_topics) print("Merging finished...") self.df = df def split_merged_tuple_into_columns(self, df): """ Splits the merged tuple (merged topic, merged target) into columns. :return: None """ df_topic = df.copy() df_topic_split = pd.DataFrame(df_topic['merged_tuple'].tolist(), columns=['merged_topic', 'cos_sim_topic', 'synonym_topic', 'merged_target', 'cos_sim_target', 'synonym_target']) self.df['merged_tuple'] = df_topic['merged_tuple'].tolist() # Manually add columns to self.df self.df['merged_topic'] = df_topic_split['merged_topic'].tolist() self.df['cos_sim_topic'] = df_topic_split['cos_sim_topic'].tolist() self.df['synonym_topic'] = self.reset_party_names(df_topic_split['synonym_topic'].tolist()) self.df['merged_target'] = df_topic_split['merged_target'].tolist() self.df['cos_sim_target'] = df_topic_split['cos_sim_target'].tolist() self.df['synonym_target'] = self.reset_party_names(df_topic_split['synonym_target'].tolist()) return self.df def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): """ Classifies the topics/sentiments of a user's tweets. #We presume that all tweets inside the twitterdata.csv file are already classified. :return: None """ # Check if file exists, if not, create it if os.path.exists(filename): # Fetch tweets from csv file already_classified_df = pd.read_csv(filename, on_bad_lines='skip') print("Already classified tweets: {}".format(already_classified_df.shape[0])) # Create a temporary df where values from already_classified_df that are not it self.df are stored temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])] # Remove rows from self.df that are not in already_classified_df self.df = self.df[~self.df['id'].isin(already_classified_df['id'])] # Only classify non-empty rows if self.df.shape[0] > 0: time.sleep(10) print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0])) self.df = self.classify_all_list() self.df = self.df.replace({'': 'none'}, regex=True) self.df = self.df.replace({' ': 'none'}, regex=True) print("Merging topics...") self.merge_all() print("Writing to csv...") self.df_to_csv(filename) # Concatenate temp_df and self.df self.df = pd.concat([temp_df, self.df], ignore_index=True) print("Appended {}.".format(filename)) return None else: self.df = pd.concat([temp_df, self.df], ignore_index=True) print("No new tweets to classify.") return None else: print("No csv file found. Continuing without removing already classified tweets.") print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0])) self.df = self.classify_all_list() self.df = self.df.replace({'': 'none'}, regex=True) self.df = self.df.replace({' ': 'none'}, regex=True) print("Merging topics...") self.merge_all() print("Writing to csv file...") self.df_to_csv(filename) print("Created {}.".format(filename)) return None if __name__ == "__main__": # $6.39 @ 3431 tweets # $18.00 @ 4608 tweets # $11.61 to classify 1177 tweets ~ $0.01 / tweet # This code snippet allows for scraping and classifying by simply specifying a start and end date. USER_LIST = ['jimmieakesson', 'BuschEbba', 'annieloof', 'JohanPehrson', 'bolund', 'martastenevi', 'SwedishPM', 'dadgostarnooshi'] start_date = date(2022, 8, 4) end_date = date(2022, 8, 4) delta = timedelta(days=1) while start_date <= end_date: from_date = start_date.strftime("%Y-%m-%d") start_date += delta to_date = start_date.strftime("%Y-%m-%d") print("curr_date: ", from_date) tc = TextClassifier(from_date=from_date, to_date=to_date, user_list=USER_LIST, num_tweets=6000) tc.run_main_pipeline()