File size: 12,710 Bytes
eceff29
5cf061d
dc67c78
130dfd8
 
44e11ec
12597ef
fe688af
c3a63c7
44e11ec
 
eceff29
ad1a1ba
 
 
eceff29
dc67c78
c706f5e
32119e0
725b13f
1ace546
5cf061d
130dfd8
 
 
 
 
 
 
ca6da9d
 
 
 
 
 
 
 
130dfd8
 
2a4df2c
 
 
1ace546
32119e0
1ace546
0e7f596
 
32119e0
ad1a1ba
32119e0
456b287
 
 
 
 
 
32119e0
 
1ace546
 
 
 
 
32119e0
 
 
 
 
 
130dfd8
 
 
 
 
32119e0
 
 
 
 
 
 
 
 
1ace546
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
1ace546
32119e0
 
 
6f26738
725b13f
32119e0
1ace546
 
32119e0
 
 
 
 
f8f979f
32119e0
 
6f26738
32119e0
 
 
1ace546
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5f746c
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ace546
32119e0
1ace546
 
 
 
 
 
 
 
fa0430c
 
 
 
1ace546
130dfd8
5ce4dc5
130dfd8
 
5ce4dc5
 
 
 
 
 
 
 
 
 
5cf061d
 
 
 
 
 
 
 
5ce4dc5
 
 
 
 
 
 
 
130dfd8
0e7f596
5ce4dc5
0e7f596
 
130dfd8
44e11ec
fe688af
5cf061d
 
 
 
44e11ec
 
456b287
44e11ec
 
 
 
 
 
 
32119e0
456b287
 
 
 
5cf061d
456b287
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5cf061d
456b287
 
 
 
32119e0
fe688af
 
 
 
 
 
 
 
 
 
 
f8f979f
fe688af
f8f979f
1903058
 
f8f979f
0e7f596
 
 
 
f8f979f
 
 
 
 
 
 
 
 
 
1903058
 
f8f979f
 
 
 
 
 
 
 
 
0e7f596
fe688af
9bb5623
da62f8f
 
 
 
 
32119e0
fe688af
456b287
0e7f596
1903058
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
import openai
import csv
import regex as re
from twitterscraper import TwitterScraper
from datetime import date
import os
from dotenv import find_dotenv, load_dotenv
import pandas as pd

# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

dotenv_path = find_dotenv()
load_dotenv(dotenv_path)
OPENAI_AUTHTOKEN = os.environ.get("OPENAI_AUTHTOKEN")


class TextClassifier:
    def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),

                 user_name='jimmieakesson',
                 num_tweets=20, ):
        """
        Initializes the TextClassifier.
        :param model_name: name of the model from openai.
        :param from_date: string of the format 'YYYY-MM-DD'.
        :param to_date: string of the format 'YYYY-MM-DD'.
        :param num_tweets: integer value of the maximum number of tweets to be scraped.
        """
        # Make sure to_date is later than from_date
        assert from_date < to_date, "from_date must be earlier than to_date"
        # Make sure the dates are in the correct format
        assert re.match(r'^\d{4}-\d{2}-\d{2}$', from_date) is not None, "from_date must be in the format YYYY-MM-DD"
        # Make sure user_name is not empty
        assert user_name is not None, "user_name cannot be empty"
        # Make sure num_tweets is a positive integer
        assert num_tweets > 0, "num_tweets must be a positive integer"

        self.model_name = model_name
        self.from_date = from_date
        self.to_date = to_date
        self.num_tweets = num_tweets
        self.user_name = user_name
        self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
        self.df = self.ts.scrape_by_user(user_name)
        # Make id as type int64
        self.df['id'] = self.df['id'].copy().astype(int)
        # self.api_key = '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'
        openai.api_key = OPENAI_AUTHTOKEN

    def classify_topic_and_sentiment(self):
        self.classify_topic_of_tweets()
        self.classify_sentiment_of_tweets()

        # save the dataframe to a csv file

    @staticmethod
    def cleanup_sentiment_results(classification_unclean):
        """
        Cleans up the results of the sentiment classification.
        :param classification_unclean: string of the classification result.
        :return: cleaned up string.
        """
        classification_clean = classification_unclean.replace('\n\n', "")
        classification_clean = classification_clean.replace('\n', "")
        if classification_clean.startswith(" "):
            classification_clean = classification_clean.replace(" ", "")

        return classification_clean

    def classify_sentiment(self, text: str):
        """
        Classifies the sentiment of a text.
        """
        assert isinstance(text, str)

        prompt_string = "Classify one sentiment for this tweet:\n \""
        prompt_string += text
        prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \
                         "\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \
                         "\nQuestions,\nStatements,\nOpinions,\nPredictions.\nSENTIMENT="

        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt_string,
            temperature=0.0,
            max_tokens=256,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
            logprobs=5
        )
        classification_unclean = response.choices[0]['text']
        classification_clean = self.cleanup_sentiment_results(classification_unclean)

        return classification_clean.lower()

    def classify_sentiment_of_tweets(self):
        """
        Classifies the sentiment of a user's tweets.
        """
        df_sentiment = self.df.copy()

        df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment)
        self.df = df_sentiment
        return self.df

    def analyze_sentiment(self, text: str, sentiment: str):
        """
        Analyzes the sentiment of a text using OpenAI.
        :param text: string of the tweet text.
        :param sentiment: string of the sentiment.
        :return:
        """
        # assert 1 == 2, "Måste fixa prompt innan denna metod körs"
        prompt_string = "Who is the TARGET of this "
        prompt_string += sentiment
        prompt_string += " TWEET?\\nTWEET=\""
        prompt_string += text
        prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET="

        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt_string,
            temperature=0,
            max_tokens=256,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )

        analyzed_sentiment = response.choices[0]['text']
        # Remove spaces at the start/end of the response
        if analyzed_sentiment.startswith(' '):
            analyzed_sentiment = analyzed_sentiment[1:]
        if analyzed_sentiment.endswith(' '):
            analyzed_sentiment = analyzed_sentiment[:-1]

        # Sometimes GPT-3 gives faulty results, so a simple filter is introduced
        # If the prediction is bad
        # -> set target value to N/A (not applicable)
        if len(analyzed_sentiment) > 50:
            analyzed_sentiment = "N/A"

        # An attempt to merge target responses that should be the same
        analyzed_sentiment = re.sub("\(", "", analyzed_sentiment)
        analyzed_sentiment = re.sub("\)", "", analyzed_sentiment)

        s_list = ["s", "the swedish social democratic party"]
        m_list = ["m", "the swedish moderate party", "the moderate party"]
        mp_list = ["mp", "the swedish green party"]

        if analyzed_sentiment.lower() == "v":
            analyzed_sentiment = "Vänsterpartiet"
        elif analyzed_sentiment.lower() == "mp":
            analyzed_sentiment = "Miljöpartiet"
        elif analyzed_sentiment.lower() in s_list:
            analyzed_sentiment = "Socialdemokraterna"
        elif analyzed_sentiment.lower() == "c":
            analyzed_sentiment = "Centerpartiet"
        elif analyzed_sentiment.lower() == "l":
            analyzed_sentiment = "Liberalerna"
        elif analyzed_sentiment.lower() == "kd":
            analyzed_sentiment = "Kristdemokraterna"
        elif analyzed_sentiment.lower() in m_list:
            analyzed_sentiment = "Moderaterna"
        elif analyzed_sentiment.lower() == "sd":
            analyzed_sentiment = "Sverigedemokraterna"
        elif analyzed_sentiment.lower() == "the swedish government":
            analyzed_sentiment = "Regeringen"

        return analyzed_sentiment

    def analyze_sentiment_of_tweets(self):
        """
        Analyzes the sentiment of a user's tweets.
        """
        # check if 'sentiment' column exists, raise exception if not
        assert 'sentiment' in self.df.columns, \
            "'sentiment' column does not exist. Please run classify_sentiment_of_tweets first."

        df_sentiment = self.df.copy()
        df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']),
                                                    axis=1)
        self.df = df_sentiment
        return self.df

    def classify_topic(self, text: str):
        """
        Classifies the topics of a text.
        :param text: string of the tweet text.
        """
        assert isinstance(text, str)

        prompt_string = "Classify one topic for this tweet:\n \""
        prompt_string += text
        prompt_string += "\" \nFor example:\nEconomy,\nEnvironment,\nHealth,\nPolitics,\nScience,\nSports,\nTechnology," \
                         "\nTransportation,\nWorld.\nTOPIC="

        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt_string,
            temperature=0,
            max_tokens=892,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
        )
        classification_unclean = response.choices[0]['text']
        classification_clean = self.cleanup_topic_results(classification_unclean)

        return classification_clean.lower()

    def classify_topics_of_tweets(self):
        """
        Classifies the topics of a user's tweets.
        """
        df_topic = self.df.copy()
        df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic)
        self.df = df_topic
        return self.df

    @staticmethod
    def cleanup_topic_results(text):
        new_item = text.replace("\n", " ")
        new_item = new_item.replace("  ", " ")
        return new_item

    def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Writes pandas df to csv file. If it already exists, it appends. If not, it creates. It also removes duplicates.
        :param filename:
        :return:
        """
        if not os.path.exists(filename):
            self.df.to_csv(filename, index=False)
        else:
            self.df.to_csv(filename, mode='a', header=False, index=False)

        self.remove_duplicates_from_csv(filename)

    @staticmethod
    def remove_duplicates_from_csv(filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Removes duplicates from csv file.
        :param filename: filename of csv file
        :return: None
        """
        with open(filename, 'r') as f:
            lines = f.readlines()
        with open(filename, 'w') as f:
            for line in lines:
                if line not in lines[lines.index(line) + 1:]:
                    f.write(line)

    def remove_already_classified_tweets(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Removes tweets that have already been classified.
        :param filename: filename of csv file
        :return: None
        """
        df = self.df
        df = df[df['sentiment'].isnull()]
        self.df = df
        self.df_to_csv(filename)

    def get_tweet_by_id(self, id, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Returns tweet by id.
        :param id: id of tweet
        :return: tweet
        """

    def run_main_pipeline(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Classifies the topics/sentiments of a user's tweets.
        #We presume that all tweets inside the twitterdata.csv file are already classified.
        :return: None
        """
        # Check if file exists, if not, create it
        if os.path.exists(filename):
            already_classified_df = pd.read_csv(filename)
            print("Already classified tweets: {}".format(already_classified_df.shape[0]))
            # Create a temporary df where values from already_classified_df that are not it self.df are stored
            temp_df = self.df[self.df['id'].isin(already_classified_df['id'])]
            # Remove rows from self.df that are not in already_classified_df
            self.df = self.df[~self.df['id'].isin(already_classified_df['id'])]
            print("Classifying topic of {} tweets...".format(self.df.shape[0]))
            self.df = self.classify_topics_of_tweets()
            print("Classifying sentiment of {} tweets...".format(self.df.shape[0]))
            self.df = self.classify_sentiment_of_tweets()
            print("Writing to csv...")
            self.df_to_csv(filename)
            # Concatenate temp_df and self.df
            self.df = pd.concat([temp_df, self.df], ignore_index=True)
            print("Appended {}.".format(filename))
            return None
        else:
            print("No csv file found. Continuing without removing already classified tweets.")
            print("Classifying topics...")
            self.df = self.classify_topics_of_tweets()
            print("Classifying sentiments...")
            self.df = self.classify_sentiment_of_tweets()
            # self.df = self.analyze_sentiment_of_tweets()
            print("Writing to csv file...")
            self.df_to_csv(filename)
            print("Created {}.".format(filename))
            return None


    def __repr__(self):
        """
        Gives a string that describes which user is classified
        :return:
        """
        return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."


if __name__ == "__main__":
    tc = TextClassifier(from_date="2020-02-10", to_date="2020-03-10", user_name='jimmieakesson', num_tweets=200)
    tc.run_main_pipeline()