Spaces:
Runtime error
Runtime error
import numpy as np | |
from textclassifier import TextClassifier as tc | |
import pandas as pd | |
import regex as re | |
from pathlib import Path | |
import glob | |
from math import sqrt | |
import os | |
import matplotlib | |
matplotlib.use('Agg') | |
import matplotlib.pyplot as plt | |
from functions import functions as f | |
import time | |
SELECTED_COLUMN_DICT = { | |
'merged_topic': ['tweet', 'main_topic', 'sub_topic', 'synonym_topic', 'cos_sim_topic', 'merged_topic'], | |
'sentiment': ['tweet', 'sentiment'], | |
'merged_target': ['tweet', 'target', 'synonym_target', 'cos_sim_target', 'merged_target'] | |
} | |
USER_LIST = ['jimmieakesson', 'BuschEbba', 'annieloof', 'JohanPehrson', 'bolund', 'martastenevi', 'SwedishPM', | |
'dadgostarnooshi'] | |
USER_NAMES = ['Jimmie Åkesson', 'Ebba Busch', 'Annie Lööf', 'Johan Pehrson', 'Per Bolund', 'Märta Stenevi', | |
'Magdalena Andersson', 'Nooshi Dadgostar'] | |
CHOICE_LIST = ['Topic', 'Sentiment', 'Target'] | |
# PLOT_CHOICES_DICT = {'Topic': 'sub_topic', 'Sentiment': 'sentiment', 'Target': 'target'} I just changed its pavue to merged target and merged topic | |
PLOT_CHOICES_DICT = {'Topic': 'merged_topic', 'Sentiment': 'sentiment', 'Target': 'merged_target'} | |
PLOT_CHOICES_REVERSE_DICT = {'merged_topic': 'Topic', 'sentiment': 'Sentiment', 'merged_target': 'Target'} | |
# PLOT_CHOICES_REVERSE_DICT= {'sub_topic':'Topic', 'sentiment':'Sentiment' , 'target':'Target'} | |
UserNameDict = dict(zip(['Jimmie Åkesson', 'Ebba Busch', 'Annie Lööf', 'Johan Pehrson', 'Per Bolund', | |
'Märta Stenevi', 'Magdalena Andersson', 'Nooshi Dadgostar'], USER_LIST)) | |
Columns = ['username', 'nlikes', 'nreplies', 'nretweets', 'main_topic', 'sub_topic', 'sentiment', 'target', 'tweet', | |
'date', 'urls', 'id', 'class_tuple', 'user_id'] | |
num_tweet = 1000 | |
LIMIT = 0.05 | |
def show_all_stats(see_full_stats): | |
dataframe = pd.read_csv("{}/data/twitterdata.csv".format(tc.ROOT_PATH)) | |
if see_full_stats: | |
return dataframe | |
else: | |
return pd.DataFrame() | |
def fixChoicesCorrectOrder(choices): | |
ListChoices = [x for x in Columns if x in choices] | |
return ListChoices | |
def MatchNameToUser(user_names): | |
users = [] | |
for N in user_names: | |
users.append(UserNameDict[N]) | |
return users | |
def convert_plot_choices(plot_choices): | |
return [PLOT_CHOICES_DICT[choice] for choice in plot_choices] | |
def convert_back_plot_choices(plot_choices_raw): | |
return [PLOT_CHOICES_REVERSE_DICT[choice] for choice in plot_choices_raw] | |
def main(From, | |
To, | |
UserNameChoices, | |
plot_choice, | |
save_selected, | |
rb1, rb2, rb3, rb4, rb5, rb6, rb7, rb8, | |
v1, v2, v3, v4, v5, v6, v7, v8, | |
s1, s2, s3, s4, s5, s6, s7, s8 | |
): | |
save_file_bool = s1, s2, s3, s4, s5, s6, s7, s8 | |
def Add_Pychart(df, leaders, plot_choices): | |
df_list = [] | |
pie_charts = [] | |
return_list = [] | |
leader_bool_list, plot_bool_list = convert_to_boolean(leaders, convert_back_plot_choices(plot_choices)) | |
bool_list = [] | |
for leader in leader_bool_list: | |
if leader: | |
for choice in plot_bool_list: | |
bool_list.append(choice) | |
else: | |
for i in range(len(plot_bool_list)): | |
bool_list.append(False) | |
for user in USER_NAMES: # leaders: | |
df_list.append((df.loc[df["username"] == UserNameDict[user]], user)) | |
for db in df_list: | |
for col in PLOT_CHOICES_REVERSE_DICT: # plot_choices: | |
if col == 'merged_target': | |
pie_charts.append(bar(db[0], col + ": " + db[1])) | |
else: | |
pie_charts.append(pie_chart(db[0], col, col + ": " + db[1])) | |
return pie_charts | |
def bar(db: pd.DataFrame, title): | |
'''This method adds a stacked bar diagram for each target and each sentiment | |
NOTE: The tweets without any target are not shown in the plot, we just show distribution of tweets that have a | |
target. | |
''' | |
if db.empty: | |
return None | |
else: | |
db['merged_target'] = db["merged_target"].apply(lambda | |
x: "other" if x == "ERROR_9000" or x == "ERROR_496" else x) # replacing Different Error type with string "other" | |
db['sentiment'] = db['sentiment'].apply( | |
lambda x: re.sub('\s+', "", x)) # removing extra spaces in at the end and beginning of the sentiments. | |
# This can be removed after we remove all unnessary spaces from twitter data | |
all_targets = ['v', 'mp', 's', 'c', 'l', 'kd', 'm', 'sd', 'Red-Greens', 'The opposition'] | |
db_new = db.loc[db["merged_target"] != "other"] # dataframe with other category removed | |
percent_target = (len(db_new) / len(db)) * 100 | |
targets = db_new["merged_target"].value_counts().keys().to_list() | |
positive = [0] * len(all_targets) | |
negative = [0] * len(all_targets) | |
neutral = [0] * len(all_targets) | |
other = [0] * len(all_targets) | |
for i, target in enumerate(all_targets): | |
temp_db = db_new.loc[db_new["merged_target"] == target] | |
if temp_db.empty: | |
pass | |
else: | |
sent = temp_db['sentiment'].to_list() | |
positive[i] += sent.count('positive') | |
negative[i] += sent.count('negative') | |
neutral[i] += sent.count('neutral') | |
other[i] += sent.count('other') | |
font1 = {'family': 'serif', 'color': 'blue', 'size': 10} | |
fig = plt.figure() | |
y1 = np.array(positive) / len(db_new) if len(db_new) > 0 else np.array(positive) | |
y2 = np.array(negative) / len(db_new) if len(db_new) > 0 else np.array(negative) | |
y3 = np.array(neutral) / len(db_new) if len(db_new) > 0 else np.array(neutral) | |
y4 = np.array(other) / len(db_new) if len(db_new) > 0 else np.array(other) | |
plt.bar(all_targets, y1, color='g') | |
plt.bar(all_targets, y2, bottom=y1, color='r') | |
plt.bar(all_targets, y3, bottom=(y1 + y2), color='yellow') | |
plt.bar(all_targets, y4, bottom=(y1 + y2 + y3), color='b') | |
plt.xticks(rotation=15) | |
plt.ylim(0, 1) | |
plt.title( | |
str(percent_target)[0:4] + "% " + " of tweets have target. " + "Number of tweets with target:" + str( | |
len(db_new)), loc='right', fontdict=font1) | |
# plt.xlabel("Targets") | |
plt.ylabel("Procent") | |
plt.legend(["positive", "negative", "neutral", "other"]) | |
return fig | |
def pie_chart(db, col_name, title): | |
if db.empty: | |
return None | |
else: | |
# db = db[col_name].value_counts()[:5] # Lägg till "Others sedan" | |
db = piechart_input(db, col_name, LIMIT) | |
labels = db[col_name].to_list() | |
sizes = db['frequency'].values | |
# explode = (0, 0.1, 0, 0, 0) # only "explode" the 2nd slice (i.e. 'Hogs') | |
font1 = {'family': 'serif', 'color': 'blue', 'size': 20} | |
fig = plt.figure() | |
plt.pie(sizes, labels=labels, radius=1, autopct='%1.1f%%') | |
plt.title(title, fontdict=font1) | |
return fig | |
text_classifier = tc.TextClassifier(from_date=From, to_date=To, user_list=MatchNameToUser(UserNameChoices), | |
num_tweets=num_tweet) | |
text_classifier.run_main_pipeline() | |
dataframe = text_classifier.get_dataframe() | |
# dataframe= pd.read_csv(os.path.dirname( | |
# os.path.dirname(os.path.abspath(__file__))) + "/politweet/data/twitterdata.csv") # | |
df = dataframe | |
if save_selected: | |
user_list = MatchNameToUser(UserNameChoices) | |
df_l = [] | |
for user in user_list: | |
df_l.append(pd.DataFrame(df.loc[df['username'] == user])) | |
selected_df = pd.concat(df_l).reset_index(drop=True) | |
export_to_download(selected_df, "selected_leaders") | |
save_selected_checkbox = [gr.Checkbox.update(interactive=False)] | |
else: | |
save_selected_checkbox = [gr.Checkbox.update(interactive=True)] | |
pycharts = Add_Pychart(df, UserNameChoices, convert_plot_choices(plot_choice)) | |
rb_components = [rb1, rb2, rb3, rb4, rb5, rb6, rb7, rb8] # radio_buttons | |
df_visibility_check = [v1, v2, v3, v4, v5, v6, v7, v8] | |
def get_selected_df_list(d_frame, save_or_no, selected_users, radio, visibility): | |
leader_bool_list = [True if leader in selected_users else False for leader in USER_NAMES] | |
df_list = [] | |
number_tweets = [] | |
save_file_components_list = [] | |
for i, u_bool in enumerate(leader_bool_list): | |
user_df = d_frame.loc[d_frame['username'] == USER_LIST[i]] | |
number_tweets.append(gr.Number.update(value=len(user_df), visible=u_bool)) | |
if save_or_no[i]: | |
export_to_download(pd.DataFrame(user_df), "one_leader") | |
save_file_components_list.append(gr.Checkbox.update(visible=u_bool, interactive=False)) | |
else: | |
save_file_components_list.append(gr.Checkbox.update(visible=u_bool)) | |
if u_bool and visibility[i]: | |
df_list.append(get_exemple_df(user_df, PLOT_CHOICES_DICT[radio[i]])) | |
else: | |
df_list.append(None) | |
return df_list + number_tweets + save_file_components_list | |
return pycharts + save_selected_checkbox + get_selected_df_list(df, save_file_bool, list(UserNameChoices), | |
rb_components, df_visibility_check) | |
''' END OF MAIN | |
#### | |
##### | |
#### | |
#### | |
''' | |
def get_exemple_df(df: pd.DataFrame, column: str): | |
print(column) | |
df = df[SELECTED_COLUMN_DICT[column]] | |
unique_labels = df[column].value_counts().keys() | |
stat = [] | |
for label in unique_labels: | |
df_temp = df.loc[df[column] == label] | |
if len(df_temp) > 5: | |
df_temp = df_temp[0:5] | |
stat.append(df_temp) | |
exemple_df = pd.concat(stat) | |
# stat =stat.reset_index(drop=True) just in case u want to reset indexing | |
return exemple_df | |
def export_to_download(_data_frame, _type: str): | |
downloads_path = str(Path.home()) + "/Downloads/" | |
if _type == "one_leader": | |
file_name = _data_frame['username'].to_list()[0] # df['username'][0] + "_data" | |
else: | |
file_name = "selected_leaders" | |
full_path = downloads_path + file_name + ".csv" | |
while full_path in glob.glob(downloads_path + "*"): | |
search_list = re.findall('\p{N}+', full_path) | |
if search_list: | |
index = search_list[0] | |
full_path = re.sub(index, str(int(index) + 1), full_path) | |
else: | |
suffix = " (1).csv" | |
full_path = re.sub('\.csv', suffix, full_path) | |
_data_frame.to_csv(full_path, index=False) | |
# , pie_chart(df, "main_topic"), pie_chart("target") | |
def piechart_input(df, column, limit): | |
df_len = len(df) | |
df_v = df[column].value_counts() | |
df_len = len(df) | |
if column == "sentiment": | |
ds_sentiment = df[column].apply(lambda x: re.sub("\s+", "", str(x))) | |
df_v = ds_sentiment.apply(lambda x: x if str(x).lower() == "positive" or str(x).lower() == "negative" or str( | |
x).lower() == "neutral" else "other").value_counts() | |
elif column == "merged_target": | |
ds_target = df[column].apply(lambda x: "other" if x == "ERROR_9000" or x == "ERROR_496" else x) | |
df_v = ds_target.value_counts() | |
freq = df_v.to_list() | |
labels = df_v.keys().to_list | |
freq_dict = {} | |
freq_dict[column] = labels | |
freq_dict["frequency"] = freq | |
return pd.DataFrame.from_dict(freq_dict) | |
else: | |
df_v = df[column].value_counts() | |
freq = df_v.to_list() | |
labels = df_v.keys().to_list() | |
freq_other = 0 | |
freq_dict = {column: [], "frequency": []} | |
for i in range(len(df_v)): | |
if freq[i] / df_len < limit: | |
freq_other += freq[i] | |
else: | |
freq_dict[column].append(labels[i]) | |
freq_dict["frequency"].append(freq[i]) | |
if "other" not in freq_dict[column]: | |
freq_dict[column].append("other") | |
freq_dict["frequency"].append(freq_other) | |
else: | |
ind_other = freq_dict[column].index("other") | |
freq_dict["frequency"][ind_other] += freq_other | |
return pd.DataFrame.from_dict(freq_dict) | |
def convert_to_boolean(leaders, plot_choices): | |
leaders_converted = [True if leader in leaders else False for leader in USER_NAMES] | |
plot_converted = [True if choice in plot_choices else False for choice in CHOICE_LIST] | |
return leaders_converted, plot_converted | |
def update_window(leaders: list, plot_choices: list, | |
v1, v2, v3, v4, v5, v6, v7, v8 | |
): | |
leader_bool_list, plot_bool_list = convert_to_boolean(leaders, plot_choices) | |
bool_list = [] | |
df_visiblity_bool = [v1, v2, v3, v4, v5, v6, v7, v8] | |
# this loop sets boolean for plots | |
for leader in leader_bool_list: | |
if leader: | |
for choice in plot_bool_list: | |
bool_list.append(choice) | |
# bool_list.append(True) ## this is for radio component | |
else: | |
for i in range(len(plot_bool_list)): | |
bool_list.append(False) | |
# bool_list.append(False) | |
update_blocks = [] | |
update_plots = [] | |
update_radio = [] | |
update_nr_tweet = [] | |
update_checkbox = [] | |
update_save_file_checkboxes = [] | |
update_df = [] | |
# all_visual = block_list + plots + radio_list + nr_tweet_list + checkbox_list + saving_file_checkboxes + df_list | |
for i, vis_or_not in enumerate(leader_bool_list): | |
update_blocks.append(gr.Row.update(visible=vis_or_not)) | |
update_blocks.append(gr.Row.update(visible=vis_or_not)) | |
if vis_or_not: | |
update_blocks.append(gr.Row.update(visible=df_visiblity_bool[i])) | |
update_df.append(gr.DataFrame.update(visible=df_visiblity_bool[i])) | |
else: | |
update_blocks.append(gr.Row.update(visible=False)) | |
update_df.append(gr.DataFrame.update(visible=False)) | |
update_nr_tweet.append(gr.Number.update(visible=vis_or_not)) | |
update_radio.append(gr.Radio.update(visible=vis_or_not)) | |
update_checkbox.append(gr.Checkbox.update(visible=vis_or_not)) | |
update_save_file_checkboxes.append(gr.Checkbox.update(visible=vis_or_not)) | |
for choice in bool_list: | |
update_plots.append(gr.Plot.update(visible=choice)) | |
return update_blocks + update_plots + update_radio + update_nr_tweet + update_checkbox + update_save_file_checkboxes + update_df | |
def add_plots(user): | |
plot_list = [] | |
for plot_type in PLOT_CHOICES_DICT: | |
plot_list.append(gr.Plot(label=plot_type + " for " + user, visible=False)) | |
return plot_list | |
def add_nbr_boxes(): | |
return [gr.Number(value=0, label='Tweets by ' + user, visible=False) for user in USER_NAMES] | |
if __name__ == "__main__": | |
import gradio as gr | |
demo = gr.Blocks(title='Politweet') | |
with demo: | |
with gr.Column(): | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Row(): | |
date1 = gr.Textbox(label="From", value='2022-05-10') | |
date2 = gr.Textbox(label="To", value='2022-05-30') | |
leaders = gr.Checkboxgroup(choices=USER_NAMES, | |
label="") | |
plot_choices = gr.CheckboxGroup(choices=CHOICE_LIST, label='Choose what to show') | |
save_selected_data_checkbox = gr.Checkbox(label="Export selected data") | |
with gr.Row(): | |
update = gr.Button('Apply') | |
btn = gr.Button("Run") | |
# show_stat = gr.Checkbox(label="Show full statistics", value=True) | |
# show_plots = gr.components.Checkbox(label='Show topics', value=True) | |
with gr.Column(): | |
selected = gr.DataFrame(label="Summary statistics for the selected choices", | |
max_rows=None, visible=False) | |
# all_data = gr.components.DataFrame(label="Summary statistics of the total database", | |
# max_rows=None) | |
plots = [] | |
radio_list = [] | |
checkbox_list = [] | |
df_list = [] | |
block_list = [] | |
saving_file_checkboxes = [] | |
nr_tweet_list = [] | |
with gr.Column(): | |
for i in range(len(USER_NAMES)): | |
block_list += [gr.Row()] * 3 | |
for i, leader in enumerate(USER_NAMES): | |
with gr.Row(): | |
plots += add_plots(leader) | |
with gr.Row(): | |
radio_list.append(gr.Radio(list(PLOT_CHOICES_DICT.keys()), visible=False, interactive=True)) | |
nr_tweet_list.append(gr.Number(visible=False)) | |
checkbox_list.append(gr.Checkbox(label="Show stats ", value=False, visible=False)) | |
saving_file_checkboxes.append(gr.Checkbox(label="Export file", value=False, visible=False)) | |
with gr.Row(): | |
df_list.append(gr.DataFrame(visible=False)) | |
inp = [date1, | |
date2, | |
leaders, | |
plot_choices, save_selected_data_checkbox] + radio_list + checkbox_list + saving_file_checkboxes | |
output = plots + [save_selected_data_checkbox] + df_list + nr_tweet_list + saving_file_checkboxes | |
all_visual = block_list + plots + radio_list + nr_tweet_list + checkbox_list + saving_file_checkboxes + df_list # + df_list # df_comps | |
update_inp = [leaders, plot_choices] + checkbox_list | |
update.click(fn=update_window, inputs=update_inp, outputs=all_visual) | |
btn.click(fn=main, inputs=inp, outputs=output) | |
# input.change(fn=main, inputs=input, outputs=output) | |
demo.launch(share=False) | |
# df= pd.read_csv(os.getcwd()+"/data/twitterdata.csv") | |
# https://51285.gradio.app | |