Spaces:
Runtime error
Runtime error
import os | |
import time | |
import warnings | |
import openai | |
import pandas as pd | |
from dotenv import find_dotenv, load_dotenv | |
from pandas.core.common import SettingWithCopyWarning | |
from twitterscraper import TwitterScraper | |
from sentence_transformers import SentenceTransformer | |
from scipy import spatial | |
from datetime import date, timedelta | |
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) | |
# Set one directory up into ROOT_PATH | |
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
dotenv_path = find_dotenv() | |
load_dotenv(dotenv_path) | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
class TextClassifier: | |
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()), | |
user_list=['jimmieakesson'], | |
num_tweets=20): | |
""" | |
Initializes the TextClassifier. | |
:param model_name: name of the model from openai. | |
:param from_date: string of the format 'YYYY-MM-DD'. | |
:param to_date: string of the format 'YYYY-MM-DD'. | |
:param num_tweets: integer value of the maximum number of tweets to be scraped. | |
""" | |
# Make sure user_name is not empty | |
assert user_list is not None, "user_name cannot be empty" | |
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets) | |
self.model_name = model_name | |
self.from_date = from_date | |
self.to_date = to_date | |
self.num_tweets = num_tweets | |
self.user_name = user_list | |
# Assure that scrape_by_user actually gets num_tweets | |
# add timer in time-loop and stop after 10 seconds | |
# self.df = self.ts.scrape_by_user(user_name) | |
self.df = self.ts.scrape_by_several_users(user_list) | |
# Check if 'id' is in self.df | |
if 'id' in self.df.columns: | |
# Make id as type int64 | |
self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x)) | |
else: | |
# If not do nothing | |
pass | |
openai.api_key = OPENAI_API_KEY | |
def classify_all(self, tweet: str): | |
""" | |
Classifies the topic, subtopic, sentiment and target of a user's tweets. | |
""" | |
import os | |
import openai | |
valid_tweet = len(tweet.split()) > 4 | |
if valid_tweet: | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
promptstring = "Decide a Tweet's political TOPIC and SUBTOPIC, without classifying it as 'politics'. Also " \ | |
"decide whether a political Tweet's " \ | |
"SENTIMENT is " \ | |
"positive, " \ | |
"negative or neutral. Also give the TARGET of the sentiment. \nGive the answer in the form ' (" \ | |
"TOPIC, SUBTOPIC, SENTIMENT, TARGET)'\n\nTweet: {} \nAnswer: ".format(tweet) | |
response = openai.Completion.create( | |
model="text-davinci-002", | |
prompt=promptstring, | |
temperature=0, | |
max_tokens=30, | |
top_p=1, | |
frequency_penalty=0.5, | |
presence_penalty=0 | |
) | |
classification_unclean = response.choices[0]['text'] | |
classification_clean = self.cleanup_topic_results(classification_unclean) | |
if classification_clean.lower() == "(topic, subtopic, sentiment, target)": | |
classification_clean = "(none, none, none, none)" | |
else: | |
classification_clean = "(none, none, none, none)" | |
return classification_clean.lower() | |
def classify_all_list(self): | |
""" | |
Classifies the topics of a user's tweets. | |
""" | |
df_topic = self.df.copy() | |
df_topic['class_tuple'] = df_topic['tweet'].apply(self.classify_all) | |
self.df = df_topic | |
self.split_tuple_into_columns() | |
return self.df | |
def cleanup_topic_results(text): | |
""" | |
Cleanup response from GPT-3 to a string matching the format: "(main_topic, sub_topic, sentiment, target)" | |
:param text: GPT-3 response | |
:return: A string on the format: "(main_topic, sub_topic, sentiment, target)" | |
""" | |
new_item = text.strip() | |
new_item = new_item.replace("\n", "") | |
new_item = new_item.replace(" ", "") | |
item_control = new_item.replace("(", "") | |
item_control = item_control.replace(")", "") | |
item_control = item_control.split(",") | |
if ' ' or '' in item_control: | |
item_control = [s.strip() if not (s == ' ' or s == '') else 'none' for s in | |
item_control] # Replace empty classifications with 'none' | |
diff = 4 - len(item_control) | |
if diff < 0: # If response gave more than four predictions | |
cutout = item_control[diff - 1:] # Cut out the superflous predictions | |
item_control = item_control[:diff - 1] # Save the rest | |
new_s = "" | |
for i in range(len(cutout)): | |
new_s += cutout[i] | |
if i < -diff: | |
new_s += " and " # Merge superflous predictions. E.g. target = 's', 'mp', 'v' -> target = 's and mp and v' | |
item_control.append(new_s) | |
elif diff > 0: # If response gave less than four predictions | |
for i in range(diff): | |
item_control.append("none") # Fill out tuple with nones | |
new_item = str(tuple(item_control)) | |
new_item = new_item.replace("'", "") | |
return new_item | |
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates. | |
:param filename: | |
:return: | |
""" | |
if not os.path.exists(filename): | |
self.df.to_csv(filename, index=False) | |
else: | |
self.df.to_csv(filename, mode='a', header=False, index=False) | |
self.remove_duplicates_from_csv(filename) | |
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Removes duplicates from csv file. | |
:param filename: filename of csv file | |
:return: None | |
""" | |
with open(filename, 'r', encoding="utf8") as f: | |
lines = f.readlines() | |
with open(filename, 'w', encoding="utf8") as f: | |
for line in lines: | |
if line not in lines[lines.index(line) + 1:]: | |
f.write(line) | |
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Removes tweets that have already been classified. | |
:param filename: filename of csv file | |
:return: None | |
""" | |
df = self.df | |
df = df[df['sentiment'].isnull()] | |
self.df = df | |
self.df_to_csv(filename) | |
def split_tuple_into_columns(self): | |
""" | |
Splits the topics (topic, subtopic, sentiment, target) into columns. | |
:return: None | |
""" | |
df_topic = self.df.copy() | |
df_topic['topics_temp'] = df_topic['class_tuple'].apply(lambda x: tuple(x[1:-1].split(","))) | |
df_topic_split = pd.DataFrame(df_topic['topics_temp'].tolist(), | |
columns=['main_topic', 'sub_topic', 'sentiment', 'target']) | |
# Manually add columns to self.df | |
self.df['main_topic'] = df_topic_split['main_topic'].tolist() | |
self.df['main_topic'] = self.df['main_topic'].replace(["n/a", "", " "], "none", regex=True) | |
self.df['main_topic'] = self.df['main_topic'].apply( | |
lambda x: x.strip() if not (len(x) == 1 and x == "-") else "none") | |
self.df['sub_topic'] = df_topic_split['sub_topic'].tolist() | |
# In a few of the outputs from GPT-3 the sub_topic = "sentiment" | |
self.df['sub_topic'] = self.df['sub_topic'].replace(["n/a", "sentiment", "", " "], "none", regex=True) | |
self.df['sub_topic'] = self.df['sub_topic'].apply( | |
lambda x: x.strip() if not (len(x) == 1 and x == "-") else "none") | |
self.df['sentiment'] = df_topic_split['sentiment'].tolist() | |
self.df['sentiment'] = self.df['sentiment'].replace(["n/a", "sentiment", "", " "], "none", regex=True) | |
self.df['sentiment'] = self.df['sentiment'].apply( | |
lambda x: x.strip() if not (len(x) == 1 and x == "-") else "none") | |
self.df['target'] = df_topic_split['target'].tolist() | |
self.df['target'] = self.df['target'].replace(["n/a", "", " "], "none", regex=True) | |
self.df['target'] = self.df['target'].apply(lambda x: x.strip() if not (len(x) == 1 and x == "-") else "none") | |
self.df.fillna('none', inplace=True) | |
def get_dataframe(self): | |
""" | |
Returns the dataframe. | |
:return: dataframe | |
""" | |
return self.df | |
def __repr__(self): | |
""" | |
Gives a string that describes which user is classified | |
:return: | |
""" | |
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "." | |
def get_database(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Returns the database containing all dataframes. | |
:param filename: filename of csv file | |
:return: | |
""" | |
db = pd.read_csv(filename) | |
return db | |
def cleanup_list(self, uncleaned_list): | |
""" | |
Cleans up faulty predictions. | |
:param uncleaned_list: the list to be cleaned | |
:return: cleaned list | |
""" | |
uncleaned_list = [s if not isinstance(s, float) else "none" for s in uncleaned_list] | |
uncleaned_list = [s if not len(s.split()) > 5 else "none" for s in uncleaned_list] | |
uncleaned_list = [s if not "swedish" in s else s.replace("swedish", " ") for s in uncleaned_list] | |
uncleaned_list = [s if not "politics" in s else s.replace("politics", "none") for s in uncleaned_list] | |
uncleaned_list = [s.replace(" ", " ") for s in uncleaned_list] | |
cleaned_list = [s.strip() for s in uncleaned_list] | |
return cleaned_list | |
def merge_lists(self, main_topic_list, sub_topic_list): | |
""" | |
Merges the topic lists. If either topic is a faulty classification, only the non-faulty topic wil be used. | |
If both are faulty, the merged topic will be labeled as faulty (ERROR_496). | |
:param main_topic_list: A list containing main topics | |
:param sub_topic_list: A list containing sub topics | |
:return: A list containing string items on the form "main_topic and sub_topic" | |
""" | |
new_list = [] | |
main_topic_list = self.clean_party_names(main_topic_list) | |
sub_topic_list = self.clean_party_names(sub_topic_list) | |
for i in range(len(main_topic_list)): | |
if main_topic_list[i].lower() == "none" and sub_topic_list[ | |
i].lower() == "none": # If the predictions are faulty | |
new_list.append("ERROR_496") # Label as ERROR_496 (faulty prediction) | |
elif main_topic_list[i].lower() == "none": | |
new_list.append(sub_topic_list[i]) | |
elif sub_topic_list[i].lower() == "none": | |
new_list.append(main_topic_list[i]) | |
else: | |
new_list.append(main_topic_list[i] + " and " + sub_topic_list[i]) | |
return new_list | |
def file_to_mat(self, classification_type): | |
""" | |
Converts a synonym textfile to a matrix in which the rows contain a general topic/target and its related words. | |
:param classification_type: The type of classification: topic or target | |
:return: a matrix in which the first element of each row is a general topic/target, and the rest are words related to | |
the topic | |
""" | |
filename = "{}/data/".format(ROOT_PATH) | |
filename += classification_type + "_synonyms.txt" | |
with open(filename, encoding='utf-8') as f: | |
lines = f.read() | |
lines = lines.split("\n") | |
topic_list = [] | |
temp_list = [] | |
for topic in lines: | |
if not topic.endswith("####"): | |
temp_list.append(topic) | |
else: | |
temp_list.append(topic[:-4]) # Remove the marker (####) | |
topic_list.append(temp_list) | |
temp_list = [] | |
return topic_list | |
def mat_to_list(self, mat): | |
""" | |
Converts a matrix from file_to_mat() into one list containing all topics and synonyms, and one list with | |
mappings for the synonyms. | |
:param mat: a matrix from file_to_mat() | |
:return: | |
""" | |
full_list = [] | |
mapped_synonyms = [] | |
for syns in mat: | |
for topic in syns: | |
full_list.append(topic) | |
mapped_synonyms.append(syns[0]) | |
return full_list, mapped_synonyms | |
def clean_party_names(self, old_topic_list): | |
""" | |
Encodes all party names to sentences that will yield a high cosine similarity value when merged with another | |
topic, without taking the actual party name into account. These sentences have deliberately been composed such | |
that they pose a low risk of being close (in the sentence embedding-space) to any possible merged topic or | |
target that may be encountered. | |
:param old_topic_list: list of topics | |
:return: list of encoded topics | |
""" | |
# Problem 1: When a party name is encountered, we want to bias the merging towards that party since the | |
# occurrence of a very general main topic (as in the example below) plus a party name as subtopic is frequent. | |
# Example: main_topic = "politics", sub_topic = "sweden democrats" -> | |
# combined_topics = "politics and sweden democrats" | |
# Problem 2: The party names themselves are biased towards certain topics/targets and lead to faulty merges. | |
# Example: Variations of words such as "Sweden" as the target/topic will be biased towards getting merged with | |
# "Sweden Democrats". | |
# Solution: Encode party names with sentences that are HIGHLY unlikely to be close to anything in the embedding | |
# space and thus enforcing a strong bias in the cosine similarity computation towards the party if encountered. | |
party_names = {} | |
party_names["m"] = "parrot computer is swimming as screen time" | |
party_names["moderaterna"] = "parrot computer is swimming as screen time" | |
party_names["moderates"] = "parrot computer is swimming as screen time" | |
party_names["the moderates"] = "parrot computer is swimming as screen time" | |
party_names["moderate party"] = "parrot computer is swimming as screen time" | |
party_names["the moderate party"] = "parrot computer is swimming as screen time" | |
party_names["the moderaterna party"] = "parrot computer is swimming as screen time" | |
party_names["sd"] = "keyboard can hire the yellow elephant in cosmos" | |
party_names["sverigedemokraterna"] = "keyboard can hire the yellow elephant in cosmos" | |
party_names["sweden democrats"] = "keyboard can hire the yellow elephant in cosmos" | |
party_names["the sweden democrats"] = "keyboard can hire the yellow elephant in cosmos" | |
party_names["the swedish democrats"] = "keyboard can hire the yellow elephant in cosmos" | |
party_names["swedish democrats"] = "keyboard can hire the yellow elephant in cosmos" | |
party_names["@jimmieakesson"] = "keyboard can hire the yellow elephant in cosmos" | |
party_names["l"] = "red weather jokes with music and the mathematician" | |
party_names["liberalerna"] = "red weather jokes with music and the mathematician" | |
party_names["liberals"] = "red weather jokes with music and the mathematician" | |
party_names["the liberals"] = "red weather jokes with music and the mathematician" | |
party_names["the liberal party"] = "red weather jokes with music and the mathematician" | |
party_names["liberal people's party"] = "red weather jokes with music and the mathematician" | |
party_names["@johanpehrson"] = "red weather jokes with music and the mathematician" | |
party_names["mp"] = "ice piano flies with pencil as direction" | |
party_names["miljöpartiet"] = "ice piano flies with pencil as direction" | |
party_names["de gröna"] = "ice piano flies with pencil as direction" | |
party_names["green party"] = "ice piano flies with pencil as direction" | |
party_names["the green party"] = "ice piano flies with pencil as direction" | |
party_names["miljopartiet"] = "ice piano flies with pencil as direction" | |
party_names["@bolund"] = "ice piano flies with pencil as direction" | |
party_names["@martastenevi"] = "ice piano flies with pencil as direction" | |
party_names["s"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["socialdemokraterna"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["social democratic party"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["the social democratic party"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["social democrats"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["the social democrats"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["sosse"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["sossen"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["sossar"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["sossarna"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["sossarnas"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["swedish social democrats"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["@swedishpm"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["v"] = "rooftop cats play physics with cardboard fire" | |
party_names["vänsterpartiet"] = "rooftop cats play physics with cardboard fire" | |
party_names["left party"] = "rooftop cats play physics with cardboard fire" | |
party_names["the left party"] = "rooftop cats play physics with cardboard fire" | |
party_names["@dadgostarnooshi"] = "rooftop cats play physics with cardboard fire" | |
party_names["c"] = "differential donuts program sunny waters" | |
party_names["centerpartiet"] = "differential donuts program sunny waters" | |
party_names["center party"] = "differential donuts program sunny waters" | |
party_names["centre party"] = "differential donuts program sunny waters" | |
party_names["the center party"] = "differential donuts program sunny waters" | |
party_names["@annieloof"] = "differential donuts program sunny waters" | |
party_names["kd"] = "cauchy-riemann met sunglasses after rolling yellow" | |
party_names["kristdemokraterna"] = "cauchy-riemann met sunglasses after rolling yellow" | |
party_names["christian democrats"] = "cauchy-riemann met sunglasses after rolling yellow" | |
party_names["the christian democrats"] = "cauchy-riemann met sunglasses after rolling yellow" | |
party_names["@buschebba"] = "cauchy-riemann met sunglasses after rolling yellow" | |
for i, topic in enumerate(old_topic_list): | |
topic = topic.lower() | |
topic = topic.replace(" ", " ") | |
topic = topic.strip() | |
if topic in party_names: | |
old_topic_list[i] = party_names.get(topic) | |
return old_topic_list | |
def reset_party_names(self, old_topic_list): | |
""" | |
Decodes the encoded party names. | |
:param old_topic_list: list of topics | |
:return: list of encoded topics | |
""" | |
party_names = {} | |
party_names["m"] = "parrot computer is swimming as screen time" | |
party_names["sd"] = "keyboard can hire the yellow elephant in cosmos" | |
party_names["l"] = "red weather jokes with music and the mathematician" | |
party_names["mp"] = "ice piano flies with pencil as direction" | |
party_names["s"] = "lamp of fire walks bird gladly tomorrow" | |
party_names["v"] = "rooftop cats play physics with cardboard fire" | |
party_names["c"] = "differential donuts program sunny waters" | |
party_names["kd"] = "cauchy-riemann met sunglasses after rolling yellow" | |
inverted_dict = {} | |
# Invert dictionary | |
for k, v in party_names.items(): | |
if v not in inverted_dict: | |
inverted_dict[v] = k | |
# Update values in old_topic_list | |
for i, topic in enumerate(old_topic_list): | |
if topic in inverted_dict.keys(): | |
old_topic_list[i] = inverted_dict.get(topic) | |
return old_topic_list | |
def merge_classifications(self, old_list, classification_type): | |
""" | |
Merges topics/targets from GPT-3 according to a list of predefined topics/targets. | |
:param old_list: list of the topics/targets to be merged | |
:param classification_type: type of classifications: topic or target | |
:return: list of new topics/targets | |
""" | |
# Get the tuple of lists containing all synonyms and general topics/targets | |
tup_list = self.mat_to_list(self.file_to_mat(classification_type)) | |
# Save list of synonyms | |
synonym_list = tup_list[0] | |
# Save list of mappings between synonym and general topic/target | |
synonym_mappings = tup_list[1] | |
# Load embedding model-names | |
model_list = ['sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2', 'all-MiniLM-L6-v2'] | |
result_dict = {} | |
# Encode party names | |
old_list = self.clean_party_names(old_list) | |
for model_name in model_list: | |
model = SentenceTransformer(model_name) | |
# Encode the topics/targets with the sentence transformer model | |
old_list_embeddings = model.encode(old_list, batch_size=64, show_progress_bar=True, | |
convert_to_tensor=True) | |
# Encode the synonyms with the sentence transformer model | |
synonym_list_embeddings = model.encode(synonym_list, batch_size=64, show_progress_bar=True, | |
convert_to_tensor=True) | |
for i, embedded_classification in enumerate(old_list_embeddings): | |
result_list = [] | |
for embedded_synonyms in synonym_list_embeddings: | |
# Compute the cosine similarity between every classification and synonym | |
result = 1 - spatial.distance.cosine(embedded_classification, embedded_synonyms) | |
result_list.append(result) | |
max_value = max(result_list) | |
max_index = result_list.index(max_value) | |
old_classification = old_list[i] | |
# Extract the general topic/target | |
new_classification = synonym_mappings[max_index] | |
# Save the topic/target that yielded the highest cosine similarity value | |
if old_classification not in result_dict: | |
result_dict[old_classification] = [(new_classification, max_value, synonym_list[max_index])] | |
# When we have found the best topics/targets after using the first transformer model | |
else: | |
# Append the results from the next model | |
result_dict[old_classification].append((new_classification, max_value, synonym_list[max_index])) | |
new_dict = {} | |
# Time to replace the old values with the new ones | |
for old_values in result_dict: | |
tup_list = result_dict[old_values] | |
max_tup = max(tup_list, key=lambda item: item[1]) | |
if classification_type == "topic": | |
limit = 0.4 | |
else: | |
limit = 0.75 | |
# Discard classification if the old topic/target is not similar to anything in our synonym lists | |
if max_tup[1] < limit: | |
max_tup = ("ERROR_9000", "{:.2f}".format(round(max_tup[1], 2)), "none") | |
else: | |
max_tup = (max_tup[0], "{:.2f}".format(round(max_tup[1], 2)), max_tup[2]) | |
new_classification = max_tup | |
if old_values not in new_dict: | |
new_dict[old_values] = new_classification | |
new_list = [] | |
for old_value in old_list: | |
new_list.append(new_dict[old_value]) | |
return new_list | |
def merge_all(self): | |
""" | |
Merges main+subtopics, targets, and updates the dataframe. | |
:param df: | |
:return: | |
""" | |
df_topics = self.df.copy() | |
sub_topics = df_topics['sub_topic'] | |
sub_topics = sub_topics.tolist() | |
sub_topics = self.cleanup_list(sub_topics) | |
main_topics = df_topics['main_topic'] | |
main_topics = main_topics.tolist() | |
main_topics = self.cleanup_list(main_topics) | |
merged_topic_list = self.merge_lists(main_topics, sub_topics) | |
targets = df_topics['target'] | |
targets = targets.tolist() | |
targets = self.cleanup_list(targets) | |
merged_topics = self.merge_classifications(merged_topic_list, "topic") | |
merged_targets = self.merge_classifications(targets, "target") | |
print("The following merges were made: ") | |
for i, top in enumerate(merged_topic_list): | |
print("TOPICS: ", top, " -> ", merged_topics[i]) | |
t_list = [] | |
for i in range(len(merged_topics)): | |
t_list.append(tuple(merged_topics[i]) + tuple(merged_targets[i])) | |
merged_tuples = t_list | |
df_topics['merged_tuple'] = merged_tuples | |
df = self.split_merged_tuple_into_columns(df_topics) | |
print("Merging finished...") | |
self.df = df | |
def split_merged_tuple_into_columns(self, df): | |
""" | |
Splits the merged tuple (merged topic, merged target) into columns. | |
:return: None | |
""" | |
df_topic = df.copy() | |
df_topic_split = pd.DataFrame(df_topic['merged_tuple'].tolist(), | |
columns=['merged_topic', 'cos_sim_topic', 'synonym_topic', 'merged_target', | |
'cos_sim_target', 'synonym_target']) | |
self.df['merged_tuple'] = df_topic['merged_tuple'].tolist() | |
# Manually add columns to self.df | |
self.df['merged_topic'] = df_topic_split['merged_topic'].tolist() | |
self.df['cos_sim_topic'] = df_topic_split['cos_sim_topic'].tolist() | |
self.df['synonym_topic'] = self.reset_party_names(df_topic_split['synonym_topic'].tolist()) | |
self.df['merged_target'] = df_topic_split['merged_target'].tolist() | |
self.df['cos_sim_target'] = df_topic_split['cos_sim_target'].tolist() | |
self.df['synonym_target'] = self.reset_party_names(df_topic_split['synonym_target'].tolist()) | |
return self.df | |
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Classifies the topics/sentiments of a user's tweets. | |
#We presume that all tweets inside the twitterdata.csv file are already classified. | |
:return: None | |
""" | |
# Check if file exists, if not, create it | |
if os.path.exists(filename): | |
# Fetch tweets from csv file | |
already_classified_df = pd.read_csv(filename, on_bad_lines='skip') | |
print("Already classified tweets: {}".format(already_classified_df.shape[0])) | |
# Create a temporary df where values from already_classified_df that are not it self.df are stored | |
temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])] | |
# Remove rows from self.df that are not in already_classified_df | |
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])] | |
# Only classify non-empty rows | |
if self.df.shape[0] > 0: | |
time.sleep(10) | |
print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0])) | |
self.df = self.classify_all_list() | |
self.df = self.df.replace({'': 'none'}, regex=True) | |
self.df = self.df.replace({' ': 'none'}, regex=True) | |
print("Merging topics...") | |
self.merge_all() | |
print("Writing to csv...") | |
self.df_to_csv(filename) | |
# Concatenate temp_df and self.df | |
self.df = pd.concat([temp_df, self.df], ignore_index=True) | |
print("Appended {}.".format(filename)) | |
return None | |
else: | |
self.df = pd.concat([temp_df, self.df], ignore_index=True) | |
print("No new tweets to classify.") | |
return None | |
else: | |
print("No csv file found. Continuing without removing already classified tweets.") | |
print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0])) | |
self.df = self.classify_all_list() | |
self.df = self.df.replace({'': 'none'}, regex=True) | |
self.df = self.df.replace({' ': 'none'}, regex=True) | |
print("Merging topics...") | |
self.merge_all() | |
print("Writing to csv file...") | |
self.df_to_csv(filename) | |
print("Created {}.".format(filename)) | |
return None | |
if __name__ == "__main__": | |
# $6.39 @ 3431 tweets | |
# $18.00 @ 4608 tweets | |
# $11.61 to classify 1177 tweets ~ $0.01 / tweet | |
# This code snippet allows for scraping and classifying by simply specifying a start and end date. | |
USER_LIST = ['jimmieakesson', 'BuschEbba', 'annieloof', 'JohanPehrson', 'bolund', 'martastenevi', 'SwedishPM', | |
'dadgostarnooshi'] | |
start_date = date(2022, 8, 4) | |
end_date = date(2022, 8, 4) | |
delta = timedelta(days=1) | |
while start_date <= end_date: | |
from_date = start_date.strftime("%Y-%m-%d") | |
start_date += delta | |
to_date = start_date.strftime("%Y-%m-%d") | |
print("curr_date: ", from_date) | |
tc = TextClassifier(from_date=from_date, to_date=to_date, user_list=USER_LIST, num_tweets=6000) | |
tc.run_main_pipeline() | |