import streamlit as st import pandas as pd import twint import nest_asyncio import multiprocessing.pool import functools from transformers import AutoModelForSequenceClassification from transformers import TFAutoModelForSequenceClassification from transformers import AutoTokenizer import numpy as np from scipy.special import softmax import csv import urllib.request import IPython.display as ipd # Preprocess text (username and link placeholders) def preprocess(text): new_text = [] for t in text.split(" "): t = '@user' if t.startswith('@') and len(t) > 1 else t t = 'http' if t.startswith('http') else t new_text.append(t) return " ".join(new_text) # Loading pretrained model MODEL = 'cardiffnlp/twitter-roberta-base-sentiment' tokenizer = AutoTokenizer.from_pretrained(MODEL) model = AutoModelForSequenceClassification.from_pretrained(MODEL) model.save_pretrained(MODEL) # Func to get a score using the above model def combined_score(text): text = preprocess(text) encoded_input = tokenizer(text, return_tensors='pt') output = model(**encoded_input) scores = output[0][0].detach().numpy() scores = softmax(scores) return -scores[0] + scores[2] # scores = [negative, neutral, positive] # https://stackoverflow.com/questions/492519/timeout-on-a-function-call def timeout(max_timeout): """Timeout decorator, parameter in seconds.""" def timeout_decorator(item): """Wrap the original function.""" @functools.wraps(item) def func_wrapper(*args, **kwargs): """Closure for function.""" pool = multiprocessing.pool.ThreadPool(processes=1) async_result = pool.apply_async(item, args, kwargs) # raises a TimeoutError if execution exceeds max_timeout return async_result.get(max_timeout) return func_wrapper return timeout_decorator # Getting tweets from a user @timeout(120.0) def get_tweets(username, limit=500, save_name=None): #nest_asyncio.apply() # Helps avoid RuntimeError: This event loop is already running # Setup config c = twint.Config() # Create a config object to store our settings c.Limit = limit # Max number of tweets to fetch (increments of 20) c.Username = username # User of interest c.Pandas = True # Store tweets in a dataframe c.Hide_output = True # Avoid printing out tweets # Run the seearch twint.run.Search(c) # Get the results and optionally save to a file as well df = twint.storage.panda.Tweets_df if save_name != None: df.to_csv(save_name) return df st.title('Test') with st.form("my_form"): st.write("Inside the form") user = st.text_input("Twitter Username") n_tweets = st.slider('How Many Tweets', 20, 2000, 20) # Every form must have a submit button. submitted = st.form_submit_button("Submit") if submitted: st.write("Fetching user", user, "n_tweets", n_tweets) tweets = get_tweets(user, limit=n_tweets) st.write(st.dataframe(tweets.head())) tweets['sentiment'] = tweets['tweet'].map(lambda s: combined_score(s)) st.write(st.dataframe(tweets[['tweet', 'sentiment']].head())) st.write("Outside the form")