import glob from pathlib import Path import numpy as np import pandas as pd import regex as re from matplotlib import pyplot as plt, use as plt_use from textclassifier import TextClassifier as tc plt_use('Agg') # from functions import functions as f # import time TOPIC = "merged_topic" SELECTED_COLUMN_DICT = { TOPIC: ['tweet', 'main_topic', 'sub_topic', 'synonym_topic', 'cos_sim_topic', 'merged_topic'], 'sentiment': ['tweet', 'sentiment'], 'merged_target': ['tweet', 'target', 'synonym_target', 'cos_sim_target', 'merged_target'] } USER_LIST = ['jimmieakesson', 'BuschEbba', 'annieloof', 'JohanPehrson', 'bolund', 'martastenevi', 'SwedishPM', 'dadgostarnooshi'] USER_NAMES = ['Jimmie Åkesson', 'Ebba Busch', 'Annie Lööf', 'Johan Pehrson', 'Per Bolund', 'Märta Stenevi', 'Magdalena Andersson', 'Nooshi Dadgostar'] CHOICE_LIST = ['Topic', 'Sentiment', 'Target'] # PLOT_CHOICES_DICT = {'Topic': 'sub_topic', 'Sentiment': 'sentiment', 'Target': 'target'} I just changed its pavue # to merged target and merged topic PLOT_CHOICES_DICT = {'Topic': TOPIC, 'Sentiment': 'sentiment', 'Target': 'merged_target'} PLOT_CHOICES_REVERSE_DICT = {TOPIC: 'Topic', 'sentiment': 'Sentiment', 'merged_target': 'Target'} # PLOT_CHOICES_REVERSE_DICT= {'sub_topic':'Topic', 'sentiment':'Sentiment' , 'target':'Target'} UserNameDict = dict(zip(['Jimmie Åkesson', 'Ebba Busch', 'Annie Lööf', 'Johan Pehrson', 'Per Bolund', 'Märta Stenevi', 'Magdalena Andersson', 'Nooshi Dadgostar'], USER_LIST)) Columns = ['username', 'nlikes', 'nreplies', 'nretweets', 'main_topic', 'sub_topic', 'sentiment', 'target', 'tweet', 'date', 'urls', 'id', 'class_tuple', 'user_id'] # NUM_TWEETS = 1000 LIMIT = 0.04 def show_all_stats(see_full_stats): dataframe = pd.read_csv("{}/data/twitterdata.csv".format(tc.ROOT_PATH)) if see_full_stats: return dataframe else: return pd.DataFrame() def fix_choices_correct_order(choices): list_choices = [x for x in Columns if x in choices] return list_choices def match_name_lower_case(user_names): users = [] for N in user_names: users.append(UserNameDict[N]) return users def convert_plot_choices(plot_choices): return [PLOT_CHOICES_DICT[choice] for choice in plot_choices] def convert_back_plot_choices(plot_choices_raw): return [PLOT_CHOICES_REVERSE_DICT[choice] for choice in plot_choices_raw] def main(from_date, to_date, usr_name_choices, plot_choice, save_selected, rb1, rb2, rb3, rb4, rb5, rb6, rb7, rb8, v1, v2, v3, v4, v5, v6, v7, v8, s1, s2, s3, s4, s5, s6, s7, s8 ): save_file_bool = s1, s2, s3, s4, s5, s6, s7, s8 # Describe what save_file_bool is for: if you want to save the dataframe to a file, this is the boolean for that def add_pie_chart(df, leaders, plot_choices): df_list = [] pie_charts = [] return_list = [] leader_bool_list, plot_bool_list = convert_to_boolean(leaders, convert_back_plot_choices(plot_choices)) bool_list = [] for leader in leader_bool_list: if leader: for choice in plot_bool_list: bool_list.append(choice) else: for i in range(len(plot_bool_list)): bool_list.append(False) for user in USER_NAMES: # leaders: df_list.append((df.loc[df["username"] == UserNameDict[user]], user)) for db in df_list: for col in PLOT_CHOICES_REVERSE_DICT: # plot_choices: if col == 'merged_target': pie_charts.append(bar(db[0], col + ": " + db[1])) elif col == 'sentiment': pie_charts.append(pie_chart(db[0], col, col + ": " + db[1])) elif col == TOPIC: pie_charts.append(nested_pie_chart(db[0])) return pie_charts def bar(db: pd.DataFrame, title): """ This method adds a stacked bar diagram for each target and each sentiment NOTE: The tweets without any target are not shown in the plot, we just show distribution of tweets that have a target. """ if db.empty: return None else: db['merged_target'] = db["merged_target"].apply(lambda x: "other" if x == "ERROR_9000" or x == "ERROR_496" else x) # replacing Different Error type with string "other" db['sentiment'] = db['sentiment'].apply( lambda x: re.sub('\s+', "", x)) # removing extra spaces in at the end and beginning of the sentiments. # This can be removed after we remove all unnessary spaces from twitter data all_targets = ['v', 'mp', 's', 'c', 'l', 'kd', 'm', 'sd', 'Red-Greens', 'The opposition'] db_new = db.loc[db["merged_target"] != "other"] # dataframe with other category removed percent_target = (len(db_new) / len(db)) * 100 positive = [0] * len(all_targets) negative = [0] * len(all_targets) neutral = [0] * len(all_targets) other = [0] * len(all_targets) for i, target in enumerate(all_targets): temp_db = db_new.loc[db_new["merged_target"] == target] if temp_db.empty: pass else: sent = temp_db['sentiment'].to_list() positive[i] += sent.count('positive') negative[i] += sent.count('negative') neutral[i] += sent.count('neutral') other[i] += sent.count('other') font1 = {'family': 'serif', 'color': 'blue', 'size': 10} fig = plt.figure() y1 = np.array(positive) / len(db_new) if len(db_new) > 0 else np.array(positive) y2 = np.array(negative) / len(db_new) if len(db_new) > 0 else np.array(negative) y3 = np.array(neutral) / len(db_new) if len(db_new) > 0 else np.array(neutral) y4 = np.array(other) / len(db_new) if len(db_new) > 0 else np.array(other) plt.bar(all_targets, y1, color='g') plt.bar(all_targets, y2, bottom=y1, color='r') plt.bar(all_targets, y3, bottom=(y1 + y2), color='yellow') plt.bar(all_targets, y4, bottom=(y1 + y2 + y3), color='b') plt.xticks(rotation=15) plt.ylim(0, 1) plt.title( str(percent_target)[0:4] + "% " + " of tweets have target. " + "Number of tweets with target:" + str( len(db_new)), loc='right', fontdict=font1) # plt.xlabel("Targets") plt.ylabel("Procent") plt.legend(["positive", "negative", "neutral", "other"]) return fig def pie_chart(db, col_name, title): if db.empty: return None else: # df = df[col_name].value_counts()[:5] # Lägg till "Others sedan" db = pie_chart_input(db, col_name, LIMIT) labels = db[col_name].to_list() sizes = db['frequency'].values # explode = (0, 0.1, 0, 0, 0) # only "explode" the 2nd slice (i.e. 'Hogs') font1 = {'family': 'serif', 'color': 'blue', 'size': 20} fig = plt.figure() plt.pie(sizes, labels=labels, radius=1, autopct='%1.1f%%') plt.title(title, fontdict=font1) return fig def nested_pie_chart(df): """ This method adds a nested pie chart. The pie chart shows the sentiment for each target. :param df: dataframe with data. The selected leaders have non-empty entries in the dataframe. :return: figure with the nested pie chart. """ if df.empty: return None else: count_dict = {} sent_dict = {'positive': 0, 'negative': 1, 'neutral': 2, 'other': 3} tot_sum = len(df) for i in range(df.shape[0]): topic = df.iloc[i][TOPIC] sentiment = df.iloc[i]['sentiment'] if df.iloc[i]['sentiment'] in sent_dict else 'other' if topic not in count_dict: count_dict[topic] = [0, 0, 0, 0] count_dict[topic][sent_dict[sentiment]] += 1 else: count_dict[topic][sent_dict[sentiment]] += 1 count_list = [] other_list = np.array([0, 0, 0, 0]) labels = [] for topic in count_dict: if tot_sum > 0 and np.sum(count_dict[topic]) / tot_sum > LIMIT: count_list.append(count_dict[topic]) labels.append(topic) else: other_list += np.array(count_dict[topic]) count_list.append(list(other_list)) labels.append('Other') fig, ax = plt.subplots() size = 0.3 vals = np.array(count_list) inner_colors = ['green', 'red', 'blue', 'yellow'] * len(count_dict) if vals.shape[0] == 0: pass else: ax.pie(vals.sum(axis=1), radius=1, labels=labels, pctdistance=0.9, wedgeprops=dict(width=size, edgecolor='w')) ax.pie(vals.flatten(), radius=1 - size, colors=inner_colors, wedgeprops=dict(width=size, edgecolor='w')) ax.set(aspect='equal', title='Topic (outer circle) and its corresponding sentiment (inner circle)') return fig # text_classifier = tc.TextClassifier(from_date=from_date, to_date=to_date, # user_list=match_name_lower_case(usr_name_choices), # num_tweets=NUM_TWEETS) # text_classifier.run_main_pipeline() # dataframe = text_classifier.get_dataframe() dataframe = pd.read_csv("{}/data/twitterdata.csv".format(tc.ROOT_PATH)) # choose subset between from_date and to_date and username is in usr_name_choices df = dataframe.loc[(dataframe['date'] >= from_date) & (dataframe['date'] <= to_date) & \ (dataframe['username'].isin(match_name_lower_case(usr_name_choices)))].copy() # Sort df by date df.sort_values(by=['date'], inplace=True) # Remove entries from df where 'tweet' starts with '@' df = df[df['tweet'].str.startswith('@') == False] # change 'merged_topic' to 'Other' if it is 'ERROR_9000' or 'ERROR_496' df[TOPIC] = df[TOPIC].apply(lambda x: "N/A" if x == "ERROR_9000" or x == "ERROR_496" else x) # change 'merged_topic' to 'Government' if it is 's' df[TOPIC] = df[TOPIC].apply(lambda x: "The Government" if x == "s" else x) if save_selected: user_list = match_name_lower_case(usr_name_choices) df_l = [] for user in user_list: df_l.append(pd.DataFrame(df.loc[df['username'] == user])) selected_df = pd.concat(df_l).reset_index(drop=True) export_to_download(selected_df, "selected_leaders") save_selected_checkbox = [gr.Checkbox.update(interactive=False)] else: save_selected_checkbox = [gr.Checkbox.update(interactive=True)] pie_charts = add_pie_chart(df, usr_name_choices, convert_plot_choices(plot_choice)) rb_components = [rb1, rb2, rb3, rb4, rb5, rb6, rb7, rb8] # radio_buttons df_visibility_check = [v1, v2, v3, v4, v5, v6, v7, v8] def get_selected_df_list(d_frame, save_or_no, selected_users, radio, visibility): leader_bool_list = [True if leader in selected_users else False for leader in USER_NAMES] df_list = [] number_tweets = [] save_file_components_list = [] for i, u_bool in enumerate(leader_bool_list): user_df = d_frame.loc[d_frame['username'] == USER_LIST[i]] number_tweets.append(gr.Number.update(value=len(user_df), visible=u_bool)) if save_or_no[i]: export_to_download(pd.DataFrame(user_df), "one_leader") save_file_components_list.append(gr.Checkbox.update(visible=u_bool, interactive=False)) else: save_file_components_list.append(gr.Checkbox.update(visible=u_bool)) if u_bool and visibility[i]: df_list.append(get_example_df(user_df, PLOT_CHOICES_DICT[radio[i]])) else: df_list.append(None) return df_list + number_tweets + save_file_components_list return pie_charts + save_selected_checkbox + get_selected_df_list(df, save_file_bool, list(usr_name_choices), rb_components, df_visibility_check) ''' END OF MAIN #### ##### #### #### ''' def get_example_df(df: pd.DataFrame, column: str): print(column) df = df[SELECTED_COLUMN_DICT[column]] unique_labels = df[column].value_counts().keys() stat = [] for label in unique_labels: df_temp = df.loc[df[column] == label] if len(df_temp) > 5: df_temp = df_temp[0:5] stat.append(df_temp) example_df = pd.concat(stat) # stat =stat.reset_index(drop=True) just in case u want to reset indexing return example_df def export_to_download(_data_frame, _type: str): downloads_path = str(Path.home()) + "/Downloads/" if _type == "one_leader": file_name = _data_frame['username'].to_list()[0] # df['username'][0] + "_data" else: file_name = "selected_leaders" full_path = downloads_path + file_name + ".csv" while full_path in glob.glob(downloads_path + "*"): search_list = re.findall('\p{N}+', full_path) if search_list: index = search_list[0] full_path = re.sub(index, str(int(index) + 1), full_path) else: suffix = " (1).csv" full_path = re.sub('\.csv', suffix, full_path) _data_frame.to_csv(full_path, index=False) # , pie_chart(df, "main_topic"), pie_chart("target") def pie_chart_input(df, column, limit): df_len = len(df) if column == "sentiment": ds_sentiment = df[column].apply(lambda x: re.sub("\s+", "", str(x))) df_v = ds_sentiment.apply(lambda x: x if str(x).lower() == "positive" or str(x).lower() == "negative" or str( x).lower() == "neutral" else "other").value_counts() elif column == "merged_target": ds_target = df[column].apply(lambda x: "other" if x == "ERROR_9000" or x == "ERROR_496" else x) df_v = ds_target.value_counts() freq = df_v.to_list() labels = df_v.keys().to_list freq_dict = {column: labels, "frequency": freq} return pd.DataFrame.from_dict(freq_dict) else: df_v = df[column].value_counts() freq = df_v.to_list() labels = df_v.keys().to_list() freq_other = 0 freq_dict = {column: [], "frequency": []} for i in range(len(df_v)): if freq[i] / df_len < limit: freq_other += freq[i] else: freq_dict[column].append(labels[i]) freq_dict["frequency"].append(freq[i]) if "other" not in freq_dict[column]: freq_dict[column].append("other") freq_dict["frequency"].append(freq_other) else: ind_other = freq_dict[column].index("other") freq_dict["frequency"][ind_other] += freq_other test_frame = pd.DataFrame.from_dict(freq_dict) return pd.DataFrame.from_dict(freq_dict) def convert_to_boolean(leaders, plot_choices): leaders_converted = [True if leader in leaders else False for leader in USER_NAMES] plot_converted = [True if choice in plot_choices else False for choice in CHOICE_LIST] return leaders_converted, plot_converted def update_window(leaders: list, plot_choices: list, v1, v2, v3, v4, v5, v6, v7, v8 ): leader_bool_list, plot_bool_list = convert_to_boolean(leaders, plot_choices) bool_list = [] df_visibility_bool = [v1, v2, v3, v4, v5, v6, v7, v8] # this loop sets boolean for plots for leader in leader_bool_list: if leader: for choice in plot_bool_list: bool_list.append(choice) # bool_list.append(True) ## this is for radio component else: for i in range(len(plot_bool_list)): bool_list.append(False) # bool_list.append(False) update_blocks = [] update_plots = [] update_radio = [] update_nr_tweet = [] update_checkbox = [] update_save_file_checkboxes = [] update_df = [] # all_visual = block_list + plots + radio_list + nr_tweet_list + checkbox_list + saving_file_checkboxes + df_list for i, vis_or_not in enumerate(leader_bool_list): update_blocks.append(gr.Row.update(visible=vis_or_not)) update_blocks.append(gr.Row.update(visible=vis_or_not)) if vis_or_not: update_blocks.append(gr.Row.update(visible=df_visibility_bool[i])) update_df.append(gr.DataFrame.update(visible=df_visibility_bool[i])) else: update_blocks.append(gr.Row.update(visible=False)) update_df.append(gr.DataFrame.update(visible=False)) update_nr_tweet.append(gr.Number.update(visible=vis_or_not)) update_radio.append(gr.Radio.update(visible=vis_or_not)) update_checkbox.append(gr.Checkbox.update(visible=vis_or_not)) update_save_file_checkboxes.append(gr.Checkbox.update(visible=vis_or_not)) for choice in bool_list: update_plots.append(gr.Plot.update(visible=choice)) return update_blocks + update_plots + update_radio + update_nr_tweet + update_checkbox + update_save_file_checkboxes + update_df def add_plots(user): plot_list = [] for plot_type in PLOT_CHOICES_DICT: plot_list.append(gr.Plot(label=plot_type + " for " + user, visible=False)) return plot_list def add_nbr_boxes(): return [gr.Number(value=0, label='Tweets by ' + user, visible=False) for user in USER_NAMES] if __name__ == "__main__": import gradio as gr demo = gr.Blocks(title='Politweet') with demo: with gr.Column(): instructions = gr.Markdown( 'How to use Politweet: \n\nFirst select the time interval you want to analyze, ' 'then the party leaders you want to focus on. After this you pick which ' 'statistics you want to see and click "Apply" to apply your choices. When ' 'you see empty boxes appearing, you can click "Run" to run the program.\n\nYou should now be able to ' 'see the visualisations of the collected statistics. Under the visualisations there are a few fields ' 'and buttons. If you want to see the full presentation of a specific statistic you can press one of ' 'the radio buttons, click "Show stats" and then once again press "Apply" and "Run" at the top. This ' 'presents a dataframe that accounts for the whole classification process. For "Topic" and "Target" ' 'there is a column called cos_sim_... which says how similar the algorithm found the classification ' 'and "synonym_..." to be. The field "Number" tells how many tweets the party leader made during the ' 'selected time period. If you do not see any tweets for a leader you might have to increase the time ' 'span.') with gr.Row(): with gr.Column(): with gr.Row(): date1 = gr.Textbox(label="from_date", value='2022-01-01') date2 = gr.Textbox(label="to_date", value='2022-05-31') leaders = gr.Checkboxgroup(choices=USER_NAMES, label="") plot_choices = gr.CheckboxGroup(choices=CHOICE_LIST, label='Choose what to show') save_selected_data_checkbox = gr.Checkbox(label="Export selected data") with gr.Row(): update = gr.Button('Apply') btn = gr.Button("Run") # show_stat = gr.Checkbox(label="Show full statistics", value=True) # show_plots = gr.components.Checkbox(label='Show topics', value=True) with gr.Column(): selected = gr.DataFrame(label="Summary statistics for the selected choices", max_rows=None, visible=False) # all_data = gr.components.DataFrame(label="Summary statistics of the total database", # max_rows=None) plots = [] radio_list = [] checkbox_list = [] df_list = [] block_list = [] saving_file_checkboxes = [] nr_tweet_list = [] with gr.Column(): for i in range(len(USER_NAMES)): block_list += [gr.Row()] * 3 for i, leader in enumerate(USER_NAMES): with gr.Row(): plots += add_plots(leader) with gr.Row(): radio_list.append(gr.Radio(list(PLOT_CHOICES_DICT.keys()), visible=False, interactive=True)) nr_tweet_list.append(gr.Number(visible=False)) checkbox_list.append(gr.Checkbox(label="Show stats ", value=False, visible=False)) saving_file_checkboxes.append(gr.Checkbox(label="Export file", value=False, visible=False)) with gr.Row(): df_list.append(gr.DataFrame(visible=False)) inp = [date1, date2, leaders, plot_choices, save_selected_data_checkbox] + radio_list + checkbox_list + saving_file_checkboxes output = plots + [save_selected_data_checkbox] + df_list + nr_tweet_list + saving_file_checkboxes all_visual = block_list + plots + radio_list + nr_tweet_list + checkbox_list + saving_file_checkboxes + df_list # + df_list # df_comps update_inp = [leaders, plot_choices] + checkbox_list update.click(fn=update_window, inputs=update_inp, outputs=all_visual) btn.click(fn=main, inputs=inp, outputs=output) # input.change(fn=main, inputs=input, outputs=output) demo.launch(share=True) # df= pd.read_csv(os.getcwd()+"/data/twitterdata.csv") # https://51285.gradio.app