Spaces:
Runtime error
Runtime error
import os | |
import time | |
import warnings | |
from datetime import date | |
import openai | |
import pandas as pd | |
import regex as re | |
from dotenv import find_dotenv, load_dotenv | |
from pandas.core.common import SettingWithCopyWarning | |
from twitterscraper import TwitterScraper | |
from functions import functions as f | |
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning) | |
# Set one directory up into ROOT_PATH | |
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
dotenv_path = find_dotenv() | |
load_dotenv(dotenv_path) | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
class TextClassifier: | |
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()), | |
user_name='jimmieakesson', | |
num_tweets=20, ): | |
""" | |
Initializes the TextClassifier. | |
:param model_name: name of the model from openai. | |
:param from_date: string of the format 'YYYY-MM-DD'. | |
:param to_date: string of the format 'YYYY-MM-DD'. | |
:param num_tweets: integer value of the maximum number of tweets to be scraped. | |
""" | |
# Make sure user_name is not empty | |
assert user_name is not None, "user_name cannot be empty" | |
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets) | |
self.model_name = model_name | |
self.from_date = from_date | |
self.to_date = to_date | |
self.num_tweets = num_tweets | |
self.user_name = user_name | |
# Assure that scrape_by_user actually gets num_tweets | |
# add timer in time-loop and stop after 10 seconds | |
start_time = time.time() | |
while True: | |
self.df = self.ts.scrape_by_user(user_name) | |
if num_tweets-5 < len(self.df) <= num_tweets: | |
break | |
else: | |
if time.time() - start_time > 15: | |
raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.") | |
continue | |
# Make id as type int64 | |
self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x)) | |
# self.api_key = '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' | |
openai.api_key = OPENAI_API_KEY | |
def classify_all(self, tweet: str): | |
""" | |
Classifies the topic, subtopic, sentiment and target of a user's tweets. | |
""" | |
import os | |
import openai | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
promptstring = "Decide a Tweet's political TOPIC and SUBTOPIC, without classifying it as 'politics'. Also " \ | |
"decide whether a political Tweet's " \ | |
"SENTIMENT is " \ | |
"positive, " \ | |
"negative or neutral. Also give the TARGET of the sentiment. \nGive the answer in the form ' (" \ | |
"TOPIC, SUBTOPIC, SENTIMENT, TARGET)'\n\nTweet: {} \nAnswer: ".format(tweet) | |
response = openai.Completion.create( | |
model="text-davinci-002", | |
prompt=promptstring, | |
temperature=0, | |
max_tokens=30, | |
top_p=1, | |
frequency_penalty=0.5, | |
presence_penalty=0 | |
) | |
classification_unclean = response.choices[0]['text'] | |
classification_clean = self.cleanup_topic_results(classification_unclean) | |
return classification_clean.lower() | |
def classify_all_list(self): | |
""" | |
Classifies the topics of a user's tweets. | |
""" | |
df_topic = self.df.copy() | |
df_topic['class_tuple'] = df_topic['tweet'].apply(self.classify_all) | |
self.df = df_topic | |
self.split_tuple_into_columns() | |
return self.df | |
def cleanup_topic_results(text): | |
new_item = text.strip() | |
new_item = new_item.replace("\n", "") | |
new_item = new_item.replace(" ", "") | |
return new_item | |
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates. | |
:param filename: | |
:return: | |
""" | |
if not os.path.exists(filename): | |
self.df.to_csv(filename, index=False) | |
else: | |
self.df.to_csv(filename, mode='a', header=False, index=False) | |
self.remove_duplicates_from_csv(filename) | |
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Removes duplicates from csv file. | |
:param filename: filename of csv file | |
:return: None | |
""" | |
with open(filename, 'r') as f: | |
lines = f.readlines() | |
with open(filename, 'w') as f: | |
for line in lines: | |
if line not in lines[lines.index(line) + 1:]: | |
f.write(line) | |
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Removes tweets that have already been classified. | |
:param filename: filename of csv file | |
:return: None | |
""" | |
df = self.df | |
df = df[df['sentiment'].isnull()] | |
self.df = df | |
self.df_to_csv(filename) | |
def split_tuple_into_columns(self): | |
""" | |
Splits the topics (topic, subtopic, sentiment, target) into columns. | |
:return: None | |
""" | |
df_topic = self.df.copy() | |
df_topic['topics_temp'] = df_topic['class_tuple'].apply(f.convert_to_tuple) | |
df_topic_split = pd.DataFrame(df_topic['topics_temp'].tolist(), | |
columns=['main_topic', 'sub_topic', 'sentiment', 'target']) | |
# Manually add columns to self.df | |
self.df['main_topic'] = df_topic_split['main_topic'].astype(str) | |
self.df['sub_topic'] = df_topic_split['sub_topic'].astype(str) | |
self.df['sentiment'] = df_topic_split['sentiment'].astype(str) | |
self.df['target'] = df_topic_split['target'].astype(str) | |
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Classifies the topics/sentiments of a user's tweets. | |
#We presume that all tweets inside the twitterdata.csv file are already classified. | |
:return: None | |
""" | |
# Check if file exists, if not, create it | |
if os.path.exists(filename): | |
# Fetch tweets from csv file | |
already_classified_df = pd.read_csv(filename, on_bad_lines='skip') | |
print("Already classified tweets: {}".format(already_classified_df.shape[0])) | |
# Create a temporary df where values from already_classified_df that are not it self.df are stored | |
temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])] | |
# Remove rows from self.df that are not in already_classified_df | |
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])] | |
# Only classify non-empty rows | |
if self.df.shape[0] > 0: | |
print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0])) | |
self.df = self.classify_all_list() | |
print("Writing to csv...") | |
self.df_to_csv(filename) | |
# Concatenate temp_df and self.df | |
self.df = pd.concat([temp_df, self.df], ignore_index=True) | |
print("Appended {}.".format(filename)) | |
return None | |
else: | |
self.df = pd.concat([temp_df, self.df], ignore_index=True) | |
print("No new tweets to classify.") | |
return None | |
else: | |
print("No csv file found. Continuing without removing already classified tweets.") | |
print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0])) | |
self.df = self.classify_all_list() | |
print("Writing to csv file...") | |
self.df_to_csv(filename) | |
print("Created {}.".format(filename)) | |
return None | |
def get_dataframe(self): | |
""" | |
Returns the dataframe. | |
:return: dataframe | |
""" | |
return self.df | |
def __repr__(self): | |
""" | |
Gives a string that describes which user is classified | |
:return: | |
""" | |
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "." | |
if __name__ == "__main__": | |
text_classifier = TextClassifier(from_date='2019-01-01', to_date="2022-07-15", user_name='jimmieakesson', | |
num_tweets=60) | |
text_classifier.run_main_pipeline() | |