Spaces:
Runtime error
Runtime error
File size: 14,063 Bytes
5214b07 dc54afb 5214b07 dc54afb eceff29 5214b07 dc67c78 12597ef 4e39d19 5214b07 4e39d19 c3a63c7 44e11ec eceff29 ad1a1ba eceff29 dc67c78 c706f5e 32119e0 725b13f 1ace546 5cf061d 130dfd8 ca6da9d 130dfd8 ae34e1d 130dfd8 2a4df2c 1ace546 aded009 672130a aded009 672130a aded009 672130a aded009 0e7f596 4e39d19 32119e0 ad1a1ba 32119e0 1ace546 32119e0 130dfd8 32119e0 1ace546 32119e0 1ace546 32119e0 6f26738 725b13f 32119e0 1ace546 32119e0 f8f979f 32119e0 6f26738 32119e0 1ace546 32119e0 c5f746c 32119e0 6fb109c 1ace546 32119e0 1ace546 fa0430c 1ace546 130dfd8 5ce4dc5 130dfd8 5ce4dc5 672130a 5ce4dc5 672130a 5ce4dc5 672130a 5cf061d 672130a 5cf061d 5ce4dc5 130dfd8 0e7f596 5ce4dc5 0e7f596 130dfd8 44e11ec fe688af 5cf061d 44e11ec 456b287 44e11ec 32119e0 456b287 5cf061d 456b287 5cf061d 456b287 32119e0 5214b07 fe688af 5214b07 fe688af 5214b07 fe688af f8f979f fe688af f8f979f 1903058 0b5fde3 6fb109c f8f979f 0e7f596 b944971 0e7f596 0b5fde3 1903058 f8f979f dc54afb f8f979f 0e7f596 15ca093 b944971 9bb5623 da62f8f 32119e0 fe688af 456b287 5214b07 b944971 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 |
import os
import time
import warnings
from datetime import date
import openai
import pandas as pd
import regex as re
from dotenv import find_dotenv, load_dotenv
from pandas.core.common import SettingWithCopyWarning
from twitterscraper import TwitterScraper
from functions import functions as f
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
dotenv_path = find_dotenv()
load_dotenv(dotenv_path)
OPENAI_AUTHTOKEN = os.environ.get("OPENAI_AUTHTOKEN")
class TextClassifier:
def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),
user_name='jimmieakesson',
num_tweets=20, ):
"""
Initializes the TextClassifier.
:param model_name: name of the model from openai.
:param from_date: string of the format 'YYYY-MM-DD'.
:param to_date: string of the format 'YYYY-MM-DD'.
:param num_tweets: integer value of the maximum number of tweets to be scraped.
"""
# Make sure user_name is not empty
assert user_name is not None, "user_name cannot be empty"
self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
self.model_name = model_name
self.from_date = from_date
self.to_date = to_date
self.num_tweets = num_tweets
self.user_name = user_name
# Assure that scrape_by_user actually gets num_tweets
# add timer in time-loop and stop after 10 seconds
start_time = time.time()
while True:
self.df = self.ts.scrape_by_user(user_name)
if 0 < len(self.df) <= num_tweets:
break
else:
if time.time() - start_time > 10:
raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.")
continue
# Make id as type int64
self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x))
# self.api_key = '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'
openai.api_key = OPENAI_AUTHTOKEN
@staticmethod
def cleanup_sentiment_results(classification_unclean):
"""
Cleans up the results of the sentiment classification.
:param classification_unclean: string of the classification result.
:return: cleaned up string.
"""
classification_clean = classification_unclean.replace('\n\n', "")
classification_clean = classification_clean.replace('\n', "")
if classification_clean.startswith(" "):
classification_clean = classification_clean.replace(" ", "")
return classification_clean
def classify_sentiment(self, text: str):
"""
Classifies the sentiment of a text.
"""
assert isinstance(text, str)
prompt_string = "Classify one sentiment for this tweet:\n \""
prompt_string += text
prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \
"\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \
"\nQuestions,\nStatements,\nOpinions,\nPredictions.\nSENTIMENT="
response = openai.Completion.create(
model=self.model_name,
prompt=prompt_string,
temperature=0.0,
max_tokens=256,
top_p=1,
frequency_penalty=0,
presence_penalty=0,
logprobs=5
)
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_sentiment_results(classification_unclean)
return classification_clean.lower()
def classify_sentiment_of_tweets(self):
"""
Classifies the sentiment of a user's tweets.
"""
df_sentiment = self.df.copy()
df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment)
self.df = df_sentiment
return self.df
def analyze_sentiment(self, text: str, sentiment: str):
"""
Analyzes the sentiment of a text using OpenAI.
:param text: string of the tweet text.
:param sentiment: string of the sentiment.
:return:
"""
# assert 1 == 2, "Måste fixa prompt innan denna metod körs"
prompt_string = "Who is the TARGET of this "
prompt_string += sentiment
prompt_string += " TWEET?\\nTWEET=\""
prompt_string += text
prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET="
response = openai.Completion.create(
model=self.model_name,
prompt=prompt_string,
temperature=0,
max_tokens=256,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
analyzed_sentiment = response.choices[0]['text']
# Remove spaces at the start/end of the response
if analyzed_sentiment.startswith(' '):
analyzed_sentiment = analyzed_sentiment[1:]
if analyzed_sentiment.endswith(' '):
analyzed_sentiment = analyzed_sentiment[:-1]
# Sometimes GPT-3 gives faulty results, so a simple filter is introduced
# If the prediction is bad
# -> set target value to N/A (not applicable)
if len(analyzed_sentiment) > 50:
analyzed_sentiment = "N/A"
# An attempt to merge target responses that should be the same
analyzed_sentiment = re.sub("\(", "", analyzed_sentiment)
analyzed_sentiment = re.sub("\)", "", analyzed_sentiment)
s_list = ["s", "the swedish social democratic party"]
m_list = ["m", "the swedish moderate party", "the moderate party"]
mp_list = ["mp", "the swedish green party"]
if analyzed_sentiment.lower() == "v":
analyzed_sentiment = "Vänsterpartiet"
elif analyzed_sentiment.lower() == "mp":
analyzed_sentiment = "Miljöpartiet"
elif analyzed_sentiment.lower() in s_list:
analyzed_sentiment = "Socialdemokraterna"
elif analyzed_sentiment.lower() == "c":
analyzed_sentiment = "Centerpartiet"
elif analyzed_sentiment.lower() == "l":
analyzed_sentiment = "Liberalerna"
elif analyzed_sentiment.lower() == "kd":
analyzed_sentiment = "Kristdemokraterna"
elif analyzed_sentiment.lower() in m_list:
analyzed_sentiment = "Moderaterna"
elif analyzed_sentiment.lower() == "sd":
analyzed_sentiment = "Sverigedemokraterna"
elif analyzed_sentiment.lower() == "the swedish government":
analyzed_sentiment = "Regeringen"
analyzed_sentiment = self.cleanup_sentiment_results(analyzed_sentiment)
return analyzed_sentiment
def analyze_sentiment_of_tweets(self):
"""
Analyzes the sentiment of a user's tweets.
"""
# check if 'sentiment' column exists, raise exception if not
assert 'sentiment' in self.df.columns, \
"'sentiment' column does not exist. Please run classify_sentiment_of_tweets first."
df_sentiment = self.df.copy()
df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']),
axis=1)
self.df = df_sentiment
return self.df
def classify_topic(self, text: str):
"""
Classifies the topics of a text.
:param text: string of the tweet text.
"""
assert isinstance(text, str)
prompt_string = "Classify this tweet with a general topic and two sub-topics:\n\""
prompt_string += text
prompt_string += "\".\nGeneral topic: \nSub topic 1: \nSub topic 2:\n. The classifications should not be " \
"more than 5 words. Numerate each topic in the output. END "
response = openai.Completion.create(
model="text-davinci-002",
prompt=prompt_string,
temperature=0,
max_tokens=892,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
classification_unclean = response.choices[0]['text']
classification_clean = self.cleanup_topic_results(classification_unclean)
return classification_clean.lower()
def classify_topics_of_tweets(self):
"""
Classifies the topics of a user's tweets.
"""
df_topic = self.df.copy()
df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic)
self.df = df_topic
return self.df
@staticmethod
def cleanup_topic_results(text):
new_item = text.replace("\n", " ")
new_item = new_item.replace(" ", " ")
return new_item
def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates.
:param filename:
:return:
"""
if not os.path.exists(filename):
self.df.to_csv(filename, index=False)
else:
self.df.to_csv(filename, mode='a', header=False, index=False)
self.remove_duplicates_from_csv(filename)
@staticmethod
def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes duplicates from csv file.
:param filename: filename of csv file
:return: None
"""
with open(filename, 'r') as f:
lines = f.readlines()
with open(filename, 'w') as f:
for line in lines:
if line not in lines[lines.index(line) + 1:]:
f.write(line)
def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Removes tweets that have already been classified.
:param filename: filename of csv file
:return: None
"""
df = self.df
df = df[df['sentiment'].isnull()]
self.df = df
self.df_to_csv(filename)
def split_topics_into_columns(self):
"""
Splits the topics into columns.
:return: None
"""
df_topic = self.df.copy()
df_topic['topic_temp'] = df_topic['topic'].apply(lambda x: f.separate_string(x))
df_topic_split = pd.DataFrame(df_topic['topic_temp'].tolist(),
columns=['main_topic', 'sub_topic_1', 'sub_topic_2'])
self.df = df_topic.merge(df_topic_split, how='left', left_index=True, right_index=True)
self.df.drop(['topic_temp'], axis=1, inplace=True)
def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
"""
Classifies the topics/sentiments of a user's tweets.
#We presume that all tweets inside the twitterdata.csv file are already classified.
:return: None
"""
# Check if file exists, if not, create it
if os.path.exists(filename):
# Fetch tweets from csv file
already_classified_df = pd.read_csv(filename, on_bad_lines='skip')
print("Already classified tweets: {}".format(already_classified_df.shape[0]))
# Create a temporary df where values from already_classified_df that are not it self.df are stored
temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])]
# Remove rows from self.df that are not in already_classified_df
self.df = self.df[~self.df['id'].isin(already_classified_df['id'])]
# Only classify non-empty rows
if self.df.shape[0] > 0:
print("Classifying topic of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_topics_of_tweets()
print("Classifying sentiment of {} tweets...".format(self.df.shape[0]))
self.df = self.classify_sentiment_of_tweets()
print("Waiting for 1 minute... before analyzing targets...")
time.sleep(65)
self.df = self.analyze_sentiment_of_tweets()
print("Writing to csv...")
self.df_to_csv(filename)
# Concatenate temp_df and self.df
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("Appended {}.".format(filename))
return None
else:
self.df = pd.concat([temp_df, self.df], ignore_index=True)
print("No new tweets to classify.")
return None
else:
print("No csv file found. Continuing without removing already classified tweets.")
print("Classifying topics...")
self.df = self.classify_topics_of_tweets()
print("Classifying sentiments...")
self.df = self.classify_sentiment_of_tweets()
print("Waiting for 1 minute... before analyzing targets...")
time.sleep(65)
self.df = self.analyze_sentiment_of_tweets()
print("Writing to csv file...")
self.df_to_csv(filename)
print("Created {}.".format(filename))
return None
def get_dataframe(self):
"""
Returns the dataframe.
:return: dataframe
"""
return self.df
def __repr__(self):
"""
Gives a string that describes which user is classified
:return:
"""
return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."
if __name__ == "__main__":
text_classifier = TextClassifier(from_date="2020-01-01", to_date="2020-01-31", user_name='jimmieakesson', num_tweets=20)
text_classifier.run_main_pipeline()
|