File size: 10,256 Bytes
eceff29
5cf061d
dc67c78
130dfd8
 
44e11ec
12597ef
c3a63c7
44e11ec
 
eceff29
ad1a1ba
 
 
eceff29
dc67c78
c706f5e
32119e0
725b13f
1ace546
5cf061d
130dfd8
 
 
 
 
 
 
ca6da9d
 
 
 
 
 
 
 
130dfd8
 
2a4df2c
 
 
1ace546
32119e0
1ace546
32119e0
ad1a1ba
32119e0
 
 
1ace546
 
 
 
 
32119e0
 
 
 
 
 
130dfd8
 
 
 
 
32119e0
 
 
 
 
 
 
 
 
1ace546
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
1ace546
32119e0
 
 
6f26738
725b13f
32119e0
1ace546
 
32119e0
 
1ace546
32119e0
 
 
 
 
 
6f26738
32119e0
 
 
1ace546
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c5f746c
32119e0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1ace546
32119e0
1ace546
 
 
 
 
 
 
 
fa0430c
 
 
 
1ace546
130dfd8
5ce4dc5
130dfd8
 
5ce4dc5
 
 
 
 
 
 
 
 
 
5cf061d
 
 
 
 
 
 
 
5ce4dc5
 
 
 
 
 
 
 
130dfd8
5ce4dc5
 
 
130dfd8
44e11ec
5ce4dc5
5cf061d
 
 
 
44e11ec
 
 
 
 
 
 
 
 
 
32119e0
44e11ec
5cf061d
 
 
 
 
 
 
 
 
 
 
32119e0
9bb5623
da62f8f
 
 
 
 
32119e0
9bb5623
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import openai
import csv
import regex as re
from twitterscraper import TwitterScraper
from datetime import date
import os
from dotenv import find_dotenv, load_dotenv

# Set one directory up into ROOT_PATH
ROOT_PATH = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))

dotenv_path = find_dotenv()
load_dotenv(dotenv_path)
OPENAI_AUTHTOKEN = os.environ.get("OPENAI_AUTHTOKEN")


class TextClassifier:
    def __init__(self, model_name="text-davinci-002", from_date='2022-01-01', to_date=str(date.today()),

                 user_name='jimmieakesson',
                 num_tweets=20, ):
        """
        Initializes the TextClassifier.
        :param model_name: name of the model from openai.
        :param from_date: string of the format 'YYYY-MM-DD'.
        :param to_date: string of the format 'YYYY-MM-DD'.
        :param num_tweets: integer value of the maximum number of tweets to be scraped.
        """
        # Make sure to_date is later than from_date
        assert from_date < to_date, "from_date must be earlier than to_date"
        # Make sure the dates are in the correct format
        assert re.match(r'^\d{4}-\d{2}-\d{2}$', from_date) is not None, "from_date must be in the format YYYY-MM-DD"
        # Make sure user_name is not empty
        assert user_name is not None, "user_name cannot be empty"
        # Make sure num_tweets is a positive integer
        assert num_tweets > 0, "num_tweets must be a positive integer"

        self.model_name = model_name
        self.from_date = from_date
        self.to_date = to_date
        self.num_tweets = num_tweets
        self.user_name = user_name
        self.ts = TwitterScraper.TwitterScraper(from_date, to_date, num_tweets)
        self.df = self.ts.scrape_by_user(user_name)
        # self.api_key = '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'
        openai.api_key = OPENAI_AUTHTOKEN

    @staticmethod
    def cleanup_sentiment_results(classification_unclean):
        """
        Cleans up the results of the sentiment classification.
        :param classification_unclean: string of the classification result.
        :return: cleaned up string.
        """
        classification_clean = classification_unclean.replace('\n\n', "")
        classification_clean = classification_clean.replace('\n', "")
        if classification_clean.startswith(" "):
            classification_clean = classification_clean.replace(" ", "")

        return classification_clean

    def classify_sentiment(self, text: str):
        """
        Classifies the sentiment of a text.
        """
        assert isinstance(text, str)

        prompt_string = "Classify one sentiment for this tweet:\n \""
        prompt_string += text
        prompt_string += "\" \nFor example:\nSupport,\nOpposition,\nCriticism,\nPraise,\nDisagreement," \
                         "\nAgreement,\nSkepticism,\nAdmiration,\nAnecdotes,\nJokes,\nMemes,\nSarcasm,\nSatire," \
                         "\nQuestions,\nStatements,\nOpinions,\nPredictions.\nSENTIMENT="

        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt_string,
            temperature=0.0,
            max_tokens=256,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
            logprobs=5
        )
        classification_unclean = response.choices[0]['text']
        classification_clean = self.cleanup_sentiment_results(classification_unclean)

        return classification_clean.lower()

    def classify_sentiment_of_tweets(self):
        """
        Classifies the sentiment of a user's tweets.
        """
        df_sentiment = self.df.copy()

        df_sentiment['sentiment'] = df_sentiment['tweet'].apply(self.classify_sentiment)
        self.df = df_sentiment
        return self.df

    def analyze_sentiment(self, text: str, sentiment: str):
        # TODO: fix prompt before running this method
        """
        Analyzes the sentiment of a text using OpenAI.
        :param text: string of the tweet text.
        :param sentiment:
        :return:
        """
        # assert 1 == 2, "Måste fixa prompt innan denna metod körs"
        prompt_string = "Who is the TARGET of this "
        prompt_string += sentiment
        prompt_string += " TWEET?\\nTWEET=\""
        prompt_string += text
        prompt_string += "\"\\n.TARGET should consist of less than 5 words.\\nTARGET="

        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt_string,
            temperature=0,
            max_tokens=256,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )

        analyzed_sentiment = response.choices[0]['text']
        # Remove spaces at the start/end of the response
        if analyzed_sentiment.startswith(' '):
            analyzed_sentiment = analyzed_sentiment[1:]
        if analyzed_sentiment.endswith(' '):
            analyzed_sentiment = analyzed_sentiment[:-1]

        # Sometimes GPT-3 gives faulty results, so a simple filter is introduced
        # If the prediction is bad
        # -> set target value to N/A (not applicable)
        if len(analyzed_sentiment) > 50:
            analyzed_sentiment = "N/A"

        # An attempt to merge target responses that should be the same
        analyzed_sentiment = re.sub("\(", "", analyzed_sentiment)
        analyzed_sentiment = re.sub("\)", "", analyzed_sentiment)

        s_list = ["s", "the swedish social democratic party"]
        m_list = ["m", "the swedish moderate party", "the moderate party"]
        mp_list = ["mp", "the swedish green party"]

        if analyzed_sentiment.lower() == "v":
            analyzed_sentiment = "Vänsterpartiet"
        elif analyzed_sentiment.lower() == "mp":
            analyzed_sentiment = "Miljöpartiet"
        elif analyzed_sentiment.lower() in s_list:
            analyzed_sentiment = "Socialdemokraterna"
        elif analyzed_sentiment.lower() == "c":
            analyzed_sentiment = "Centerpartiet"
        elif analyzed_sentiment.lower() == "l":
            analyzed_sentiment = "Liberalerna"
        elif analyzed_sentiment.lower() == "kd":
            analyzed_sentiment = "Kristdemokraterna"
        elif analyzed_sentiment.lower() in m_list:
            analyzed_sentiment = "Moderaterna"
        elif analyzed_sentiment.lower() == "sd":
            analyzed_sentiment = "Sverigedemokraterna"
        elif analyzed_sentiment.lower() == "the swedish government":
            analyzed_sentiment = "Regeringen"

        return analyzed_sentiment

    def analyze_sentiment_of_tweets(self):
        """
        Analyzes the sentiment of a user's tweets.
        """
        # check if 'sentiment' column exists, raise exception if not
        assert 'sentiment' in self.df.columns, \
            "'sentiment' column does not exist. Please run classify_sentiment_of_tweets first."

        df_sentiment = self.df.copy()
        df_sentiment['target'] = df_sentiment.apply(lambda row: self.analyze_sentiment(row['tweet'], row['sentiment']),
                                                    axis=1)
        self.df = df_sentiment
        return self.df

    def classify_topic(self, text: str):
        """
        Classifies the topics of a text.
        :param text: string of the tweet text.
        """
        assert isinstance(text, str)

        prompt_string = "Classify one topic for this tweet:\n \""
        prompt_string += text
        prompt_string += "\" \nFor example:\nEconomy,\nEnvironment,\nHealth,\nPolitics,\nScience,\nSports,\nTechnology," \
                         "\nTransportation,\nWorld.\nTOPIC="

        response = openai.Completion.create(
            model=self.model_name,
            prompt=prompt_string,
            temperature=0,
            max_tokens=892,
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0,
        )
        classification_unclean = response.choices[0]['text']
        classification_clean = self.cleanup_topic_results(classification_unclean)

        return classification_clean.lower()

    def classify_topics_of_tweets(self):
        """
        Classifies the topics of a user's tweets.
        """
        df_topic = self.df
        df_topic['topic'] = df_topic['tweet'].apply(self.classify_topic)
        return df_topic

    @staticmethod
    def cleanup_topic_results(prediction_dict, text):
        new_item = text.replace("\n", " ")
        new_item = new_item.replace("  ", " ")
        return new_item

    def df_to_csv(self, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Writes pandas df to csv file. If it already exists, it appends.
        :param filename:
        :return:
        """
        if not os.path.exists(filename):
            self.df.to_csv(filename, index=False)
        else:
            self.df.to_csv(filename, mode='a', header=False, index=False)

    def return_row_if_ID_exists(self, id: str, filename="{}/data/twitterdata.csv".format(ROOT_PATH)):
        """
        Checks if a ID is already in the Data.csv file and if it is, it returns the row
        :param id:
        :return:
        """
        with open(filename, 'r') as csvfile:
            datareader = csv.reader(csvfile)
            for row in datareader:
                if row[0] == id:
                    return row
            return None

    def __repr__(self):
        """
        Gives a string that describes which user is classified
        :return:
        """
        return "Classifier for user: " + self.user_name + " with model: " + self.model_name + "."

# if __name__ == "__main__":
#     import pandas as pd
#     from datetime import datetime
#     import os
#     # show all columns
#     pd.set_option('display.max_columns', None)
#
#     tc = TextClassifier(from_date="2019-01-01", to_date="2019-05-31", user_name='jimmieakesson', num_tweets=20)
#     tc.classify_sentiment_of_tweets()
#     # df = tc.analyze_sentiment_of_tweets()
#     # print(df)
#     df = tc.classify_topics_of_tweets()
#     print(df)
#     # save to csv in a folder under politweet with timestamp in name
#     df.to_csv(f"{datetime.now().strftime('%Y-%m-%d %H-%M-%S')}_tweets.csv")