Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import pandas as pd | |
| from matplotlib import pyplot as plt | |
| import twint | |
| import nest_asyncio | |
| import multiprocessing.pool | |
| import functools | |
| from transformers import AutoModelForSequenceClassification | |
| from transformers import TFAutoModelForSequenceClassification | |
| from transformers import AutoTokenizer | |
| import numpy as np | |
| from scipy.special import softmax | |
| import csv | |
| import urllib.request | |
| import IPython.display as ipd | |
| st.write('Loading...') | |
| # Preprocess text (username and link placeholders) | |
| def preprocess(text): | |
| new_text = [] | |
| for t in text.split(" "): | |
| t = '@user' if t.startswith('@') and len(t) > 1 else t | |
| t = 'http' if t.startswith('http') else t | |
| new_text.append(t) | |
| return " ".join(new_text) | |
| # Loading pretrained model | |
| MODEL = 'cardiffnlp/twitter-roberta-base-sentiment' | |
| tokenizer = AutoTokenizer.from_pretrained(MODEL) | |
| model = AutoModelForSequenceClassification.from_pretrained(MODEL) | |
| model.save_pretrained(MODEL) | |
| tokenizer.save_pretrained(MODEL) | |
| # Func to get a score using the above model | |
| def combined_score(text): | |
| text = preprocess(text) | |
| encoded_input = tokenizer(text, return_tensors='pt') | |
| output = model(**encoded_input) | |
| scores = output[0][0].detach().numpy() | |
| scores = softmax(scores) | |
| return -scores[0] + scores[2] # scores = [negative, neutral, positive] | |
| # https://stackoverflow.com/questions/492519/timeout-on-a-function-call | |
| def timeout(max_timeout): | |
| """Timeout decorator, parameter in seconds.""" | |
| def timeout_decorator(item): | |
| """Wrap the original function.""" | |
| def func_wrapper(*args, **kwargs): | |
| """Closure for function.""" | |
| pool = multiprocessing.pool.ThreadPool(processes=1) | |
| async_result = pool.apply_async(item, args, kwargs) | |
| # raises a TimeoutError if execution exceeds max_timeout | |
| return async_result.get(max_timeout) | |
| return func_wrapper | |
| return timeout_decorator | |
| # Getting tweets from a user | |
| def get_tweets(username, limit=500, save_name=None): | |
| #nest_asyncio.apply() # Helps avoid RuntimeError: This event loop is already running | |
| # Setup config | |
| c = twint.Config() # Create a config object to store our settings | |
| c.Limit = limit # Max number of tweets to fetch (increments of 20) | |
| c.Username = username # User of interest | |
| c.Pandas = True # Store tweets in a dataframe | |
| c.Hide_output = True # Avoid printing out tweets | |
| # Run the seearch | |
| twint.run.Search(c) | |
| # Get the results and optionally save to a file as well | |
| df = twint.storage.panda.Tweets_df | |
| if save_name != None: | |
| df.to_csv(save_name) | |
| return df | |
| title = st.title('Twitter Sentiment Map Thingee') | |
| with st.form("my_form"): | |
| st.write("Parameters:") | |
| user = st.text_input("Twitter Username") | |
| n_tweets = st.slider('How Many Tweets', 20, 2000, 20) | |
| # Every form must have a submit button. | |
| submitted = st.form_submit_button("Submit") | |
| if submitted: | |
| st.write("Fetching user", user, "n_tweets", n_tweets) | |
| tweets = get_tweets(user, limit=n_tweets) | |
| st.write("Resulting dataframe shape:", tweets.shape) | |
| st.write("Calculating sentiments...") | |
| tweets['sentiment'] = tweets['tweet'].map(lambda s: combined_score(s)) | |
| tweets['tweet_length'] = tweets['tweet'].map(lambda s: len(s)) | |
| st.write("Average sentiment:", tweets.sentiment.mean()) | |
| fig, axs = plt.subplots(1, 2, figsize=(12, 6)) | |
| axs[0].hexbin(tweets['tweet_length'], tweets['sentiment']*1, | |
| gridsize=20, bins=12, cmap='inferno') | |
| axs[0].set_title('Tweet Sentiment and Length') | |
| axs[1].scatter(tweets['tweet_length'], tweets['sentiment']) | |
| axs[1].set_title('Tweet Sentiment vs Length') | |
| plt.setp(axs[:], xlabel='Tweet Length') | |
| plt.setp(axs[:], ylabel='Sentiment') | |
| st.pyplot(fig) |