Spaces:
Runtime error
Runtime error
import time | |
import openai | |
import csv | |
import regex as re | |
from twitterscraper import TwitterScraper | |
from datetime import date | |
import os | |
from dotenv import find_dotenv, load_dotenv | |
import pandas as pd | |
# Set one directory up into ROOT_PATH | |
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
dotenv_path = find_dotenv() | |
load_dotenv(dotenv_path) | |
OPENAI_AUTHTOKEN = os.environ.get("OPENAI_AUTHTOKEN") | |
class TextClassifier: | |
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()), | |
user_name='jimmieakesson', | |
num_tweets=20, ): | |
""" | |
Initializes the TextClassifier. | |
:param model_name: name of the model from openai. | |
:param from_date: string of the format 'YYYY-MM-DD'. | |
:param to_date: string of the format 'YYYY-MM-DD'. | |
:param num_tweets: integer value of the maximum number of tweets to be scraped. | |
""" | |
# Make sure user_name is not empty | |
assert user_name is not None, "user_name cannot be empty" | |
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets) | |
self.model_name = model_name | |
self.from_date = from_date | |
self.to_date = to_date | |
self.num_tweets = num_tweets | |
self.user_name = user_name | |
# Assure that scrape_by_user actually gets num_tweets | |
# add timer in time-loop and stop after 10 seconds | |
start_time = time.time() | |
while True: | |
self.df = self.ts.scrape_by_user(user_name) | |
if 0 < len(self.df) <= num_tweets: | |
break | |
else: | |
if time.time() - start_time > 10: | |
raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.") | |
continue | |
# Make id as type int64 | |
self.df['id'] = self.df['id'].copy().astype(int) | |
# self.api_key = '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' | |
openai.api_key = OPENAI_AUTHTOKEN | |
def cleanup_sentiment_results(classification_unclean): | |
""" | |
Cleans up the results of the sentiment classification. | |
:param classification_unclean: string of the classification result. | |
:return: cleaned up string. | |
""" | |
classification_clean = classification_unclean.replace('\n\n', "") | |
classification_clean = classification_clean.replace('\n', "") | |
if classification_clean.startswith(" "): | |
classification_clean = classification_clean.replace(" ", "") | |
return classification_clean | |
def classify_sentiment(self, text: str): | |
""" | |
Classifies the sentiment of a text. | |
""" | |
assert isinstance(text, str) | |
prompt_string = "Classify one sentiment for this tweet:\n \"" | |
prompt_string += text | |
prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \ | |
"\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \ | |
"\nQuestions,\nStatements,\nOpinions,\nPredictions.\nSENTIMENT=" | |
response = openai.Completion.create( | |
model=self.model_name, | |
prompt=prompt_string, | |
temperature=0.0, | |
max_tokens=256, | |
top_p=1, | |
frequency_penalty=0, | |
presence_penalty=0, | |
logprobs=5 | |
) | |
classification_unclean = response.choices[0]['text'] | |
classification_clean = self.cleanup_sentiment_results(classification_unclean) | |
return classification_clean.lower() | |
def classify_sentiment_of_tweets(self): | |
""" | |
Classifies the sentiment of a user's tweets. | |
""" | |
df_sentiment = self.df.copy() | |
df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment) | |
self.df = df_sentiment | |
return self.df | |
def analyze_sentiment(self, text: str, sentiment: str): | |
""" | |
Analyzes the sentiment of a text using OpenAI. | |
:param text: string of the tweet text. | |
:param sentiment: string of the sentiment. | |
:return: | |
""" | |
# assert 1 == 2, "Måste fixa prompt innan denna metod körs" | |
prompt_string = "Who is the TARGET of this " | |
prompt_string += sentiment | |
prompt_string += " TWEET?\\nTWEET=\"" | |
prompt_string += text | |
prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET=" | |
response = openai.Completion.create( | |
model=self.model_name, | |
prompt=prompt_string, | |
temperature=0, | |
max_tokens=256, | |
top_p=1, | |
frequency_penalty=0, | |
presence_penalty=0 | |
) | |
analyzed_sentiment = response.choices[0]['text'] | |
# Remove spaces at the start/end of the response | |
if analyzed_sentiment.startswith(' '): | |
analyzed_sentiment = analyzed_sentiment[1:] | |
if analyzed_sentiment.endswith(' '): | |
analyzed_sentiment = analyzed_sentiment[:-1] | |
# Sometimes GPT-3 gives faulty results, so a simple filter is introduced | |
# If the prediction is bad | |
# -> set target value to N/A (not applicable) | |
if len(analyzed_sentiment) > 50: | |
analyzed_sentiment = "N/A" | |
# An attempt to merge target responses that should be the same | |
analyzed_sentiment = re.sub("\(", "", analyzed_sentiment) | |
analyzed_sentiment = re.sub("\)", "", analyzed_sentiment) | |
s_list = ["s", "the swedish social democratic party"] | |
m_list = ["m", "the swedish moderate party", "the moderate party"] | |
mp_list = ["mp", "the swedish green party"] | |
if analyzed_sentiment.lower() == "v": | |
analyzed_sentiment = "Vänsterpartiet" | |
elif analyzed_sentiment.lower() == "mp": | |
analyzed_sentiment = "Miljöpartiet" | |
elif analyzed_sentiment.lower() in s_list: | |
analyzed_sentiment = "Socialdemokraterna" | |
elif analyzed_sentiment.lower() == "c": | |
analyzed_sentiment = "Centerpartiet" | |
elif analyzed_sentiment.lower() == "l": | |
analyzed_sentiment = "Liberalerna" | |
elif analyzed_sentiment.lower() == "kd": | |
analyzed_sentiment = "Kristdemokraterna" | |
elif analyzed_sentiment.lower() in m_list: | |
analyzed_sentiment = "Moderaterna" | |
elif analyzed_sentiment.lower() == "sd": | |
analyzed_sentiment = "Sverigedemokraterna" | |
elif analyzed_sentiment.lower() == "the swedish government": | |
analyzed_sentiment = "Regeringen" | |
analyzed_sentiment = self.cleanup_sentiment_results(analyzed_sentiment) | |
return analyzed_sentiment | |
def analyze_sentiment_of_tweets(self): | |
""" | |
Analyzes the sentiment of a user's tweets. | |
""" | |
# check if 'sentiment' column exists, raise exception if not | |
assert 'sentiment' in self.df.columns, \ | |
"'sentiment' column does not exist. Please run classify_sentiment_of_tweets first." | |
df_sentiment = self.df.copy() | |
df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']), | |
axis=1) | |
self.df = df_sentiment | |
return self.df | |
def classify_topic(self, text: str): | |
""" | |
Classifies the topics of a text. | |
:param text: string of the tweet text. | |
""" | |
assert isinstance(text, str) | |
prompt_string = "Classify this tweet with a general topic and two sub-topics:\n\"" | |
prompt_string += text | |
prompt_string += "\".\nGeneral topic: \nSub topic 1: \nSub topic 2:\n. The classifications should not be " \ | |
"more than 5 words. Numerate each topic in the output. END " | |
response = openai.Completion.create( | |
model="text-davinci-002", | |
prompt=prompt_string, | |
temperature=0, | |
max_tokens=892, | |
top_p=1, | |
frequency_penalty=0, | |
presence_penalty=0 | |
) | |
classification_unclean = response.choices[0]['text'] | |
classification_clean = self.cleanup_topic_results(classification_unclean) | |
return classification_clean.lower() | |
def classify_topics_of_tweets(self): | |
""" | |
Classifies the topics of a user's tweets. | |
""" | |
df_topic = self.df.copy() | |
df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic) | |
self.df = df_topic | |
return self.df | |
def cleanup_topic_results(text): | |
new_item = text.replace("\n", " ") | |
new_item = new_item.replace(" ", " ") | |
return new_item | |
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates. | |
:param filename: | |
:return: | |
""" | |
if not os.path.exists(filename): | |
self.df.to_csv(filename, index=False) | |
else: | |
self.df.to_csv(filename, mode='a', header=False, index=False) | |
self.remove_duplicates_from_csv(filename) | |
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Removes duplicates from csv file. | |
:param filename: filename of csv file | |
:return: None | |
""" | |
with open(filename, 'r') as f: | |
lines = f.readlines() | |
with open(filename, 'w') as f: | |
for line in lines: | |
if line not in lines[lines.index(line) + 1:]: | |
f.write(line) | |
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Removes tweets that have already been classified. | |
:param filename: filename of csv file | |
:return: None | |
""" | |
df = self.df | |
df = df[df['sentiment'].isnull()] | |
self.df = df | |
self.df_to_csv(filename) | |
def get_tweet_by_id(self, id, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Returns tweet by id. | |
:param id: id of tweet | |
:return: tweet | |
""" | |
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)): | |
""" | |
Classifies the topics/sentiments of a user's tweets. | |
#We presume that all tweets inside the twitterdata.csv file are already classified. | |
:return: None | |
""" | |
# Check if file exists, if not, create it | |
if os.path.exists(filename): | |
already_classified_df = pd.read_csv(filename, on_bad_lines='skip') | |
print("Already classified tweets: {}".format(already_classified_df.shape[0])) | |
# Create a temporary df where values from already_classified_df that are not it self.df are stored | |
temp_df = self.df[self.df['id'].isin(already_classified_df['id'])] | |
# Remove rows from self.df that are not in already_classified_df | |
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])] | |
print("Classifying topic of {} tweets...".format(self.df.shape[0])) | |
self.df = self.classify_topics_of_tweets() | |
print("Classifying sentiment of {} tweets...".format(self.df.shape[0])) | |
self.df = self.classify_sentiment_of_tweets() | |
print("Waiting for 1 minute... before analyzing targets...") | |
time.sleep(65) | |
self.df = self.analyze_sentiment_of_tweets() | |
print("Writing to csv...") | |
self.df_to_csv(filename) | |
# Concatenate temp_df and self.df | |
self.df = pd.concat([temp_df, self.df], ignore_index=True) | |
print("Appended {}.".format(filename)) | |
return None | |
else: | |
print("No csv file found. Continuing without removing already classified tweets.") | |
print("Classifying topics...") | |
self.df = self.classify_topics_of_tweets() | |
print("Classifying sentiments...") | |
self.df = self.classify_sentiment_of_tweets() | |
print("Waiting for 1 minute... before analyzing targets...") | |
time.sleep(65) | |
self.df = self.analyze_sentiment_of_tweets() | |
print("Writing to csv file...") | |
self.df_to_csv(filename) | |
print("Created {}.".format(filename)) | |
return None | |
def __repr__(self): | |
""" | |
Gives a string that describes which user is classified | |
:return: | |
""" | |
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "." | |
if __name__ == "__main__": | |
tc = TextClassifier(from_date="2020-02-01", to_date="2020-02-28", user_name='jimmieakesson', num_tweets=20) | |
tc.run_main_pipeline() | |