File size: 5,628 Bytes
8158335
 
 
 
 
2a3e5e3
 
 
af2c220
 
 
 
 
8158335
 
 
 
 
 
 
 
 
 
 
 
9f54156
 
 
 
 
 
 
8158335
 
9f54156
8158335
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2a3e5e3
 
 
8158335
2a3e5e3
8158335
2a3e5e3
 
 
8158335
 
2a3e5e3
 
 
 
 
8158335
 
2a3e5e3
 
8158335
2a3e5e3
 
 
 
 
 
 
 
8158335
2a3e5e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
af2c220
2a3e5e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import re
import nltk
from typing import List
from transformers import pipeline
from tqdm import tqdm
import numpy as np
import numpy as np
import scipy
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer
from scipy.special import softmax
import os

def tweet_cleaner(tweet: str) -> str:
    # words = set(nltk.corpus.words.words())
    """
    Cleans a tweet by removing @ mentions, URLs, hashtags, and non-valid words.

    Args:
        tweet (str): A single tweet as a string.

    Returns:
        str: The cleaned tweet.
    """
    if not isinstance(tweet, str):
        try:
            tweet = str(tweet)
        except Exception as e:
            print(f"Error converting tweet to string: {e}")
            return tweet
        
    bad_start = ["http:", "https:"]
    for w in bad_start:
        tweet = re.sub(f" {w}\\S+", "", tweet)  # remove white space before url
        tweet = re.sub(f"{w}\\S+ ", "", tweet)  # in case a tweet starts with a url
        tweet = re.sub(f"\n{w}\\S+ ", "", tweet)  # in case the url is on a new line
        tweet = re.sub(
            f"\n{w}\\S+", "", tweet
        )  # in case the url is alone on a new line
        tweet = re.sub(f"{w}\\S+", "", tweet)  # any other case?
    tweet = re.sub(" +", " ", tweet)  # replace multiple spaces with one space
    return " ".join(tweet.split()).strip()


def is_boring_tweet(tweet):
    """Check if tweet is boring."""
    boring_stuff = ["http", "@", "#"]
    not_boring_words = sum(
        1
        for word in tweet.split()
        if not any(bs in word.lower() for bs in boring_stuff)
    )
    return not_boring_words < 3


def fix_text(text):
    text = text.replace("&amp;", "&")
    text = text.replace("&lt;", "<")
    text = text.replace("&gt;", ">")
    return text


def twitter_sentiment_api_score(
    tweet_list: list = None, return_argmax: bool = True, use_api=False
):
    """
    Sends a list of tweets to the Hugging Face Twitter Sentiment Analysis API and returns a list of sentiment scores for each tweet.

    Args:
        tweet_list (list): A list of strings, where each string represents a tweet.
        return_argmax (bool): Whether to also return the predicted sentiment label with the highest confidence score for each tweet.

    Returns:
        A list of dictionaries, where each dictionary contains the sentiment scores for a single tweet. Each sentiment score dictionary
        contains three key-value pairs: "positive", "neutral", and "negative". The value for each key is a float between 0 and 1 that
        represents the confidence score for that sentiment label, where higher values indicate higher confidence in that sentiment. If
        `return_argmax` is True, each dictionary will also contain an additional key "argmax" with the predicted sentiment label for
        that tweet.
    """

    if use_api:
        import requests

        # URL and authentication header for the Hugging Face Twitter Sentiment Analysis API
        API_URL = "https://api-inference.huggingface.co/models/cardiffnlp/twitter-roberta-base-sentiment"
        headers = {"Authorization": "Bearer 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"}

        # Function to send a POST request with a JSON payload to the API and return the response as a JSON object
        def query(payload):
            response = requests.post(API_URL, headers=headers, json=payload)
            return response.json()

        # Send a list of tweets to the API and receive a list of sentiment scores for each tweet
        output = query(
            {
                "inputs": tweet_list,
            }
        )
    else:
        task = "sentiment"
        MODEL = f"cardiffnlp/twitter-roberta-base-{task}"
        tokenizer = AutoTokenizer.from_pretrained(MODEL)
        model = AutoModelForSequenceClassification.from_pretrained(MODEL)
        # model.save_pretrained(MODEL)

        def get_sentimet(text):
            labels = ["negative", "neutral", "positive"]
            # text = "Good night 😊"
            text = tweet_cleaner(text)
            encoded_input = tokenizer(text, return_tensors="pt")
            output = model(**encoded_input)
            scores = output[0][0].detach().numpy()
            scores = softmax(scores)
            ranking = np.argsort(scores)[::-1]
            results = {
                labels[ranking[i]]: np.round(float(scores[ranking[i]]), 4)
                for i in range(scores.shape[0])
            }

            max_key = max(results, key=results.get)
            results["argmax"] = max_key
            return results

        return [get_sentimet(t) for t in tqdm(tweet_list)]

    # Loop through the list of sentiment scores and replace the sentiment labels with more intuitive labels
    result = []
    for s in output:
        sentiment_dict = {}
        for d in s:
            if isinstance(d, dict):
                if d["label"] == "LABEL_2":
                    sentiment_dict["positive"] = d["score"]
                elif d["label"] == "LABEL_1":
                    sentiment_dict["neutral"] = d["score"]
                elif d["label"] == "LABEL_0":
                    sentiment_dict["negative"] = d["score"]
        if return_argmax and len(sentiment_dict) > 0:
            argmax_label = max(sentiment_dict, key=sentiment_dict.get)
            sentiment_dict["argmax"] = argmax_label
        result.append(sentiment_dict)

    # Return a list of dictionaries, where each dictionary contains the sentiment scores for a single tweet

    return result