Spaces:
Runtime error
Runtime error
File size: 9,629 Bytes
5214b07 dc54afb 5214b07 dc54afb eceff29 5214b07 dc67c78 12597ef 4e39d19 5214b07 4e39d19 c3a63c7 44e11ec eceff29 ad1a1ba 297c37f eceff29 dc67c78 c706f5e 32119e0 725b13f 1ace546 5cf061d 130dfd8 ca6da9d 130dfd8 ae34e1d 130dfd8 2a4df2c 1ace546 aded009 672130a aded009 672130a aded009 672130a aded009 0e7f596 4e39d19 32119e0 297c37f 32119e0 1ace546 32119e0 130dfd8 5ce4dc5 130dfd8 0e7f596 5f7371a 0e7f596 5f7371a 0e7f596 130dfd8 297c37f 44e11ec fe688af 5cf061d 44e11ec 456b287 44e11ec 32119e0 456b287 5cf061d 456b287 5cf061d 456b287 32119e0 297c37f fe688af f8f979f fe688af f8f979f 1903058 0b5fde3 6fb109c f8f979f 0e7f596 b944971 0e7f596 0b5fde3 297c37f 0b5fde3 1903058 297c37f f8f979f 0e7f596 15ca093 b944971 9bb5623 da62f8f 32119e0 fe688af 456b287 297c37f b944971 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
import os
import time
import warnings
from datetime import date
import openai
import pandas as pd
import regex as re
from dotenv import find_dotenv, load_dotenv
from pandas.core.common import SettingWithCopyWarning
from twitterscraper import TwitterScraper
from functions import functions as f
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
dotenv_path = find_dotenv()
load_dotenv(dotenv_path)
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
class TextClassifier:
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),
user_name='jimmieakesson',
num_tweets=20, ):
"""
Initializes the TextClassifier.
:param model_name: name of the model from openai.
:param from_date: string of the format 'YYYY-MM-DD'.
:param to_date: string of the format 'YYYY-MM-DD'.
:param num_tweets: integer value of the maximum number of tweets to be scraped.
"""
# Make sure user_name is not empty
assert user_name is not None, "user_name cannot be empty"
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
self.model_name = model_name
self.from_date = from_date
self.to_date = to_date
self.num_tweets = num_tweets
self.user_name = user_name
# Assure that scrape_by_user actually gets num_tweets
# add timer in time-loop and stop after 10 seconds
start_time = time.time()
while True:
self.df = self.ts.scrape_by_user(user_name)
if 0 < len(self.df) <= num_tweets:
break
else:
if time.time() - start_time > 10:
raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.")
continue
# Make id as type int64
self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x))
# self.api_key = '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'
openai.api_key = OPENAI_API_KEY
@staticmethod
def cleanup_sentiment_results(classification_unclean):
"""
Cleans up the results of the sentiment classification.
:param classification_unclean: string of the classification result.
:return: cleaned up string.
"""
classification_clean = classification_unclean.replace('\n\n', "")
classification_clean = classification_clean.replace('\n', "")
if classification_clean.startswith(" "):
classification_clean = classification_clean.replace(" ", "")
return classification_clean
def classify_topics_of_tweets(self):
"""
Classifies the topics of a user's tweets.
"""
df_topic = self.df.copy()
df_topic['topics'] = df_topic['tweet'].apply(self.classify_topic)
self.df = df_topic
self.split_topics_into_columns()
return self.df
def classify_all(self, tweet: str):
"""
Classifies the topic, subtopic, sentiment and target of a user's tweets.
"""
import os
import openai
openai.api_key = os.getenv("OPENAI_API_KEY")
promptstring = "Decide a Tweet's political TOPIC and SUBTOPIC, without classifying it as 'politics'. Also " \
"decide whether a political Tweet's " \
"SENTIMENT is " \
"positive, " \
"negative or neutral. Also give the TARGET of the sentiment. \nGive the answer in the form ' (" \
"TOPIC, SUBTOPIC, SENTIMENT, TARGET)'\n\nTweet: {} \nAnswer: ".format(tweet)
response = openai.Completion.create(
model="text-davinci-002",
prompt=promptstring,
temperature=0,
max_tokens=30,
top_p=1,
frequency_penalty=0.5,
presence_penalty=0
)
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_topic_results(classification_unclean)
return classification_clean.lower()
def classify_all_list(self):
"""
Classifies the topics of a user's tweets.
"""
df_topic = self.df.copy()
df_topic['class_tuple'] = df_topic['tweet'].apply(self.classify_all)
self.df = df_topic
self.split_tuple_into_columns()
return self.df
@staticmethod
def cleanup_topic_results(text):
new_item = text.replace("\n", " ")
new_item = new_item.replace(" ", " ")
return new_item
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates.
:param filename:
:return:
"""
if not os.path.exists(filename):
self.df.to_csv(filename, index=False)
else:
self.df.to_csv(filename, mode='a', header=False, index=False)
self.remove_duplicates_from_csv(filename)
@staticmethod
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes duplicates from csv file.
:param filename: filename of csv file
:return: None
"""
with open(filename, 'r') as f:
lines = f.readlines()
with open(filename, 'w') as f:
for line in lines:
if line not in lines[lines.index(line) + 1:]:
f.write(line)
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes tweets that have already been classified.
:param filename: filename of csv file
:return: None
"""
df = self.df
df = df[df['sentiment'].isnull()]
self.df = df
self.df_to_csv(filename)
def split_tuple_into_columns(self):
"""
Splits the topics (topic, subtopic, sentiment, target) into columns.
:return: None
"""
df_topic = self.df.copy()
df_topic['topics_temp'] = df_topic['class_tuple'].apply(f.convert_to_tuple)
df_topic_split = pd.DataFrame(df_topic['topics_temp'].tolist(),
columns=['main_topic', 'sub_topic', 'sentiment', 'target'])
self.df = df_topic.merge(df_topic_split, how='left', left_index=True, right_index=True)
self.df.drop(['topics_temp'], axis=1, inplace=True)
# Remove '(' and ')' from main_topic
self.df['main_topic'] = self.df['main_topic'].apply(lambda x: x.replace("(", ""))
self.df['main_topic'] = self.df['main_topic'].apply(lambda x: x.replace(")", ""))
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Classifies the topics/sentiments of a user's tweets.
#We presume that all tweets inside the twitterdata.csv file are already classified.
:return: None
"""
# Check if file exists, if not, create it
if os.path.exists(filename):
# Fetch tweets from csv file
already_classified_df = pd.read_csv(filename, on_bad_lines='skip')
print("Already classified tweets: {}".format(already_classified_df.shape[0]))
# Create a temporary df where values from already_classified_df that are not it self.df are stored
temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])]
# Remove rows from self.df that are not in already_classified_df
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])]
# Only classify non-empty rows
if self.df.shape[0] > 0:
print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_all_list()
print("Writing to csv...")
self.df_to_csv(filename)
# Concatenate temp_df and self.df
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("Appended {}.".format(filename))
return None
else:
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("No new tweets to classify.")
return None
else:
print("No csv file found. Continuing without removing already classified tweets.")
print("Classifying topic, subtopic, sentiment and target of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_all_list()
print("Writing to csv file...")
self.df_to_csv(filename)
print("Created {}.".format(filename))
return None
def get_dataframe(self):
"""
Returns the dataframe.
:return: dataframe
"""
return self.df
def __repr__(self):
"""
Gives a string that describes which user is classified
:return:
"""
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."
if __name__ == "__main__":
text_classifier = TextClassifier(from_date="2022-07-01", to_date="2022-07-15", user_name='jimmieakesson',
num_tweets=20)
text_classifier.run_main_pipeline()
|