File size: 14,063 Bytes
5214b07
dc54afb
5214b07
 
dc54afb
eceff29
5214b07
dc67c78
12597ef
4e39d19
5214b07
 
 
4e39d19
 
c3a63c7
44e11ec
 
eceff29
ad1a1ba
 
 
eceff29
dc67c78
c706f5e
32119e0
725b13f
1ace546
5cf061d
130dfd8
 
 
 
 
 
 
ca6da9d
 
130dfd8
ae34e1d
130dfd8
2a4df2c
 
 
1ace546
aded009
672130a
 
aded009
 
672130a
aded009
 
672130a
 
aded009
0e7f596
4e39d19
32119e0
ad1a1ba
32119e0
 
 
1ace546
 
 
 
 
32119e0
 
 
 
 
 
130dfd8
 
 
 
 
32119e0
 
 
 
 
 
 
 
 
1ace546
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
1ace546
32119e0
 
 
6f26738
725b13f
32119e0
1ace546
 
32119e0
 
 
 
 
f8f979f
32119e0
 
6f26738
32119e0
 
 
1ace546
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5f746c
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6fb109c
1ace546
32119e0
1ace546
 
 
 
 
 
 
 
fa0430c
 
 
 
1ace546
130dfd8
5ce4dc5
130dfd8
 
5ce4dc5
 
 
 
672130a
5ce4dc5
672130a
 
5ce4dc5
672130a
5cf061d
 
 
 
 
672130a
5cf061d
5ce4dc5
 
 
 
 
 
 
 
130dfd8
0e7f596
5ce4dc5
0e7f596
 
130dfd8
44e11ec
fe688af
5cf061d
 
 
 
44e11ec
 
456b287
44e11ec
 
 
 
 
 
 
32119e0
456b287
 
 
 
5cf061d
456b287
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5cf061d
456b287
 
 
 
32119e0
5214b07
fe688af
5214b07
 
fe688af
5214b07
 
 
 
 
 
fe688af
 
 
 
 
f8f979f
fe688af
f8f979f
1903058
0b5fde3
6fb109c
f8f979f
0e7f596
b944971
0e7f596
 
0b5fde3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1903058
 
f8f979f
 
 
 
dc54afb
 
 
f8f979f
 
 
 
0e7f596
15ca093
 
 
 
 
 
b944971
9bb5623
da62f8f
 
 
 
 
32119e0
fe688af
456b287
5214b07
b944971
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
import os
import time
import warnings
from datetime import date

import openai
import pandas as pd
import regex as re
from dotenv import find_dotenv, load_dotenv
from pandas.core.common import SettingWithCopyWarning

from twitterscraper import TwitterScraper
from functions import functions as f

warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)

# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

dotenv_path = find_dotenv()
load_dotenv(dotenv_path)
OPENAI_AUTHTOKEN = os.environ.get("OPENAI_AUTHTOKEN")


class TextClassifier:
    def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),

                 user_name='jimmieakesson',
                 num_tweets=20, ):
        """
        Initializes the TextClassifier.
        :param model_name: name of the model from openai.
        :param from_date: string of the format 'YYYY-MM-DD'.
        :param to_date: string of the format 'YYYY-MM-DD'.
        :param num_tweets: integer value of the maximum number of tweets to be scraped.
        """
        # Make sure user_name is not empty
        assert user_name is not None, "user_name cannot be empty"

        self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
        self.model_name = model_name
        self.from_date = from_date
        self.to_date = to_date
        self.num_tweets = num_tweets
        self.user_name = user_name
        # Assure that scrape_by_user actually gets num_tweets
        # add timer in time-loop and stop after 10 seconds
        start_time = time.time()
        while True:
            self.df = self.ts.scrape_by_user(user_name)
            if 0 < len(self.df) <= num_tweets:
                break
            else:
                if time.time() - start_time > 10:
                    raise Exception("Could not get enough tweets. Please try again. Perhaps try different time range.")
                continue
        # Make id as type int64
        self.df.loc[:, 'id'] = self.df.id.copy().apply(lambda x: int(x))
        # self.api_key = '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'
        openai.api_key = OPENAI_AUTHTOKEN

    @staticmethod
    def cleanup_sentiment_results(classification_unclean):
        """
        Cleans up the results of the sentiment classification.
        :param classification_unclean: string of the classification result.
        :return: cleaned up string.
        """
        classification_clean = classification_unclean.replace('\n\n', "")
        classification_clean = classification_clean.replace('\n', "")
        if classification_clean.startswith(" "):
            classification_clean = classification_clean.replace(" ", "")

        return classification_clean

    def classify_sentiment(self, text: str):
        """
        Classifies the sentiment of a text.
        """
        assert isinstance(text, str)

        prompt_string = "Classify one sentiment for this tweet:\n \""
        prompt_string += text
        prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \
                         "\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \
                         "\nQuestions,\nStatements,\nOpinions,\nPredictions.\nSENTIMENT="

        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt_string,
            temperature=0.0,
            max_tokens=256,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
            logprobs=5
        )
        classification_unclean = response.choices[0]['text']
        classification_clean = self.cleanup_sentiment_results(classification_unclean)

        return classification_clean.lower()

    def classify_sentiment_of_tweets(self):
        """
        Classifies the sentiment of a user's tweets.
        """
        df_sentiment = self.df.copy()

        df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment)
        self.df = df_sentiment
        return self.df

    def analyze_sentiment(self, text: str, sentiment: str):
        """
        Analyzes the sentiment of a text using OpenAI.
        :param text: string of the tweet text.
        :param sentiment: string of the sentiment.
        :return:
        """
        # assert 1 == 2, "Måste fixa prompt innan denna metod körs"
        prompt_string = "Who is the TARGET of this "
        prompt_string += sentiment
        prompt_string += " TWEET?\\nTWEET=\""
        prompt_string += text
        prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET="

        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt_string,
            temperature=0,
            max_tokens=256,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )

        analyzed_sentiment = response.choices[0]['text']
        # Remove spaces at the start/end of the response
        if analyzed_sentiment.startswith(' '):
            analyzed_sentiment = analyzed_sentiment[1:]
        if analyzed_sentiment.endswith(' '):
            analyzed_sentiment = analyzed_sentiment[:-1]

        # Sometimes GPT-3 gives faulty results, so a simple filter is introduced
        # If the prediction is bad
        # -> set target value to N/A (not applicable)
        if len(analyzed_sentiment) > 50:
            analyzed_sentiment = "N/A"

        # An attempt to merge target responses that should be the same
        analyzed_sentiment = re.sub("\(", "", analyzed_sentiment)
        analyzed_sentiment = re.sub("\)", "", analyzed_sentiment)

        s_list = ["s", "the swedish social democratic party"]
        m_list = ["m", "the swedish moderate party", "the moderate party"]
        mp_list = ["mp", "the swedish green party"]

        if analyzed_sentiment.lower() == "v":
            analyzed_sentiment = "Vänsterpartiet"
        elif analyzed_sentiment.lower() == "mp":
            analyzed_sentiment = "Miljöpartiet"
        elif analyzed_sentiment.lower() in s_list:
            analyzed_sentiment = "Socialdemokraterna"
        elif analyzed_sentiment.lower() == "c":
            analyzed_sentiment = "Centerpartiet"
        elif analyzed_sentiment.lower() == "l":
            analyzed_sentiment = "Liberalerna"
        elif analyzed_sentiment.lower() == "kd":
            analyzed_sentiment = "Kristdemokraterna"
        elif analyzed_sentiment.lower() in m_list:
            analyzed_sentiment = "Moderaterna"
        elif analyzed_sentiment.lower() == "sd":
            analyzed_sentiment = "Sverigedemokraterna"
        elif analyzed_sentiment.lower() == "the swedish government":
            analyzed_sentiment = "Regeringen"

        analyzed_sentiment = self.cleanup_sentiment_results(analyzed_sentiment)
        return analyzed_sentiment

    def analyze_sentiment_of_tweets(self):
        """
        Analyzes the sentiment of a user's tweets.
        """
        # check if 'sentiment' column exists, raise exception if not
        assert 'sentiment' in self.df.columns, \
            "'sentiment' column does not exist. Please run classify_sentiment_of_tweets first."

        df_sentiment = self.df.copy()
        df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']),
                                                    axis=1)
        self.df = df_sentiment
        return self.df

    def classify_topic(self, text: str):
        """
        Classifies the topics of a text.
        :param text: string of the tweet text.
        """
        assert isinstance(text, str)

        prompt_string = "Classify this tweet with a general topic and two sub-topics:\n\""
        prompt_string += text
        prompt_string += "\".\nGeneral topic: \nSub topic 1: \nSub topic 2:\n. The classifications should not be " \
                         "more than 5 words. Numerate each topic in the output. END "
        response = openai.Completion.create(
            model="text-davinci-002",
            prompt=prompt_string,
            temperature=0,
            max_tokens=892,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )
        classification_unclean = response.choices[0]['text']
        classification_clean = self.cleanup_topic_results(classification_unclean)

        return classification_clean.lower()

    def classify_topics_of_tweets(self):
        """
        Classifies the topics of a user's tweets.
        """
        df_topic = self.df.copy()
        df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic)
        self.df = df_topic
        return self.df

    @staticmethod
    def cleanup_topic_results(text):
        new_item = text.replace("\n", " ")
        new_item = new_item.replace("  ", " ")
        return new_item

    def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates.
        :param filename:
        :return:
        """
        if not os.path.exists(filename):
            self.df.to_csv(filename, index=False)
        else:
            self.df.to_csv(filename, mode='a', header=False, index=False)

        self.remove_duplicates_from_csv(filename)

    @staticmethod
    def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Removes duplicates from csv file.
        :param filename: filename of csv file
        :return: None
        """
        with open(filename, 'r') as f:
            lines = f.readlines()
        with open(filename, 'w') as f:
            for line in lines:
                if line not in lines[lines.index(line) + 1:]:
                    f.write(line)

    def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Removes tweets that have already been classified.
        :param filename: filename of csv file
        :return: None
        """
        df = self.df
        df = df[df['sentiment'].isnull()]
        self.df = df
        self.df_to_csv(filename)

    def split_topics_into_columns(self):
        """
        Splits the topics into columns.
        :return: None
        """
        df_topic = self.df.copy()
        df_topic['topic_temp'] = df_topic['topic'].apply(lambda x: f.separate_string(x))
        df_topic_split = pd.DataFrame(df_topic['topic_temp'].tolist(),
                                      columns=['main_topic', 'sub_topic_1', 'sub_topic_2'])
        self.df = df_topic.merge(df_topic_split, how='left', left_index=True, right_index=True)
        self.df.drop(['topic_temp'], axis=1, inplace=True)

    def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Classifies the topics/sentiments of a user's tweets.
        #We presume that all tweets inside the twitterdata.csv file are already classified.
        :return: None
        """
        # Check if file exists, if not, create it
        if os.path.exists(filename):
            # Fetch tweets from csv file
            already_classified_df = pd.read_csv(filename, on_bad_lines='skip')
            print("Already classified tweets: {}".format(already_classified_df.shape[0]))
            # Create a temporary df where values from already_classified_df that are not it self.df are stored
            temp_df = already_classified_df[already_classified_df['id'].isin(self.df['id'])]
            # Remove rows from self.df that are not in already_classified_df
            self.df = self.df[~self.df['id'].isin(already_classified_df['id'])]
            # Only classify non-empty rows
            if self.df.shape[0] > 0:
                print("Classifying topic of {} tweets...".format(self.df.shape[0]))
                self.df = self.classify_topics_of_tweets()
                print("Classifying sentiment of {} tweets...".format(self.df.shape[0]))
                self.df = self.classify_sentiment_of_tweets()
                print("Waiting for 1 minute... before analyzing targets...")
                time.sleep(65)
                self.df = self.analyze_sentiment_of_tweets()
                print("Writing to csv...")
                self.df_to_csv(filename)
                # Concatenate temp_df and self.df
                self.df = pd.concat([temp_df, self.df], ignore_index=True)
                print("Appended {}.".format(filename))
                return None
            else:
                self.df = pd.concat([temp_df, self.df], ignore_index=True)
                print("No new tweets to classify.")
                return None
        else:
            print("No csv file found. Continuing without removing already classified tweets.")
            print("Classifying topics...")
            self.df = self.classify_topics_of_tweets()
            print("Classifying sentiments...")
            self.df = self.classify_sentiment_of_tweets()
            print("Waiting for 1 minute... before analyzing targets...")
            time.sleep(65)
            self.df = self.analyze_sentiment_of_tweets()
            print("Writing to csv file...")
            self.df_to_csv(filename)
            print("Created {}.".format(filename))
            return None

    def get_dataframe(self):
        """
        Returns the dataframe.
        :return: dataframe
        """
        return self.df

    def __repr__(self):
        """
        Gives a string that describes which user is classified
        :return:
        """
        return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."


if __name__ == "__main__":
    text_classifier = TextClassifier(from_date="2020-01-01", to_date="2020-01-31", user_name='jimmieakesson', num_tweets=20)
    text_classifier.run_main_pipeline()