Spaces:
Runtime error
Runtime error
import random | |
import numpy as np | |
import pandas as pd | |
import plotly.express as px | |
import streamlit as st | |
import xlsxwriter | |
from os import listdir | |
from .lib import set_input, create_dowload_button | |
from os.path import isfile, join, exists | |
import printj | |
# import cv2 | |
import matplotlib.image as mpimg | |
class LogAnalyser: | |
def __init__(self, gen, container_guide, container_param, container_button): | |
self.gen, self.container_guide, self.container_param, self.container_button = gen, container_guide, container_param, container_button | |
# self.gen.initialise_classifier_model() | |
dirpath = 'data' | |
log_file_paths = sorted( | |
[join(dirpath, f) for f in listdir(dirpath) if isfile(join(dirpath, f)) and f.startswith('ist_log')]) | |
self.path = container_param.selectbox( | |
'Select the log path', log_file_paths) | |
self.df_path = f'data/df/{self.path.split("/")[-1].split(".")[0]}.csv' | |
# if 'button1_counter' not in st.session_state: | |
# st.session_state.button1_counter = 0 | |
# if 'df' not in st.session_state: | |
# self.df=0 | |
st.markdown(self.get_text()) | |
self.placeholder = dict() | |
def get_text(): | |
return ''' | |
### Equation | |
``` | |
frequency_penalty = 1 - emotion_frequency | |
probability_emote = w * emotion_confidence + (1 - w) * frequency_penalty | |
Show_Emotion = probability_emote > (Random value between 0 and 1) | |
``` | |
''' | |
def display_logs(self): | |
# self.container_param.markdown( | |
# f'st.session_state.button1_counter: {st.session_state.button1_counter}') | |
self.emotion_type = self.container_param.select_slider( | |
'How many Emotion data to show?', ['Max-only', '2', '3', '4', '5', '6', 'All 7']) | |
self.debug = 'debug' in self.df_path | |
if (not exists(self.df_path) or self.container_button.button('Detect Emotion')) and (not self.debug): | |
self.df = self.get_log() | |
# else: | |
self.df = pd.read_csv(self.df_path) | |
# if 'path' not in st.session_state: | |
# st.session_state.path=self.path | |
# if 'df' not in st.session_state or st.session_state.path!=self.path: | |
# st.session_state.df=self.get_log(self.path, self.gen) | |
# st.session_state.path=self.path | |
self.update_df() | |
if self.debug: | |
for name in ['c1plot', 'c2plot']: | |
self.placeholder[name] = st.empty() | |
# image = cv2.imread(f'data/img/{name}.png') | |
# image=cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
image = mpimg.imread(f'data/img/{name}.png') | |
self.placeholder[name].image(image) | |
self.get_c1_plot() | |
self.get_c2_plot() | |
def get_c1_plot(self): | |
# c2_threshold=0 | |
c1_threshold_list = np.arange(0, 1, 0.01) | |
c1_reaction_weight_list = np.arange(0, 1.1, 0.1) | |
# reaction_weight=0.5 | |
list_stories = self.df.Story.unique() | |
total_num_stories = len(list_stories) | |
num_stories2show = 9 # int(set_input(self.container_param, | |
# label='Number of stories to show', min_value=1, max_value=total_num_stories, value=9, step=1, | |
# key_slider='num_stories2show_slider', key_input='num_stories2show_input',)) | |
list_stories2show = list_stories[:num_stories2show] | |
c1r_sum_list = [] | |
df_c1_analysis = pd.DataFrame() | |
c1_analysis_dict = dict() | |
for reaction_weight in c1_reaction_weight_list: | |
reaction_weight=np.round(reaction_weight, 2) | |
for c1_threshold in c1_threshold_list: | |
df_c1 = self.df.copy() | |
for story_id in list_stories2show: | |
reaction_num = 0 | |
reaction_frequency = 0 | |
probability_emote = 0 | |
reaction_show = False | |
subset_condition = self.get_subset_condition(df_c1, story_id) | |
dfs = df_c1[subset_condition] | |
for i, (index, row) in enumerate(dfs.iterrows()): | |
if row.Emotion == 'neutral' or row.Score < self.score_threshold: | |
reaction_show = False | |
else: | |
reaction_frequency = reaction_num/(i+1) | |
probability_emote = row.Score*reaction_weight + \ | |
(1-reaction_weight)*(1-reaction_frequency) | |
reaction_show = True if probability_emote > c1_threshold else False | |
if reaction_show: | |
reaction_num += 1 | |
df_c1.at[index, 'reaction_frequency'] = reaction_frequency | |
df_c1.at[index, 'probability_emote'] = probability_emote | |
df_c1.at[index, 'c1_threshold'] = c1_threshold | |
df_c1.at[index, 'reaction_show'] = reaction_show | |
df_c1.at[index, 'c1'] = reaction_show | |
review = df_c1.e_review[index] | |
df_c1.at[index, 'c1r'] = self.get_criteria_review( | |
reaction_show, review=review, neutral_emotion=row.Emotion == 'neutral') | |
c1r_sum = df_c1['c1r'].sum() | |
c1r_sum_list.append(c1r_sum) | |
c1_analysis_dict['c1_threshold']=c1_threshold | |
c1_analysis_dict['reaction_weight']=reaction_weight | |
c1_analysis_dict['c1r_sum']=c1r_sum | |
df_c1_analysis=pd.concat([df_c1_analysis, pd.DataFrame(c1_analysis_dict, index=[0])]) | |
# fig = px.line(x=c1_threshold_list, y=c1r_sum_list) | |
fig = px.line(data_frame=df_c1_analysis, x='c1_threshold', y='c1r_sum', color='reaction_weight') | |
fig.update_layout( | |
title="Criteria 1 analysis `PE > Threshold`", | |
xaxis_title="PE Threshold", | |
yaxis_title="Count of good reviews", | |
# legend_title="Legend Title", | |
font=dict( | |
# family="Courier New, monospace", | |
size=14, | |
color="#006064" | |
), | |
) | |
# st.plotly_chart(fig, use_container_width=True) | |
self.placeholder['c1plot'].plotly_chart(fig, use_container_width=True) | |
def get_c2_plot(self): | |
# c2_threshold=0 | |
c2_threshold_list = np.arange(0, 1, 0.01) | |
list_stories = self.df.Story.unique() | |
total_num_stories = len(list_stories) | |
num_stories2show = 9 # int(set_input(self.container_param, | |
# label='Number of stories to show', min_value=1, max_value=total_num_stories, value=9, step=1, | |
# key_slider='num_stories2show_slider', key_input='num_stories2show_input',)) | |
list_stories2show = list_stories[:num_stories2show] | |
c2r_sum_list = [] | |
for c2_threshold in c2_threshold_list: | |
df_c2 = self.df.copy() | |
for story_id in list_stories2show: | |
subset_condition = self.get_subset_condition(df_c2, story_id) | |
dfs = df_c2[subset_condition] | |
for i, (index, row) in enumerate(dfs.iterrows()): | |
c2 = row.Score > c2_threshold | |
df_c2.at[index, 'c2'] = c2 | |
review = df_c2.e_review[index] | |
df_c2.at[index, 'c2r'] = self.get_criteria_review( | |
c2, review=review, neutral_emotion=row.Emotion == 'neutral') | |
c2r_sum_list.append(df_c2['c2r'].sum()) | |
fig = px.line(x=c2_threshold_list, y=c2r_sum_list) | |
fig.update_layout( | |
title="Criteria 2 analysis `CS > Threshold`", | |
xaxis_title="CS Threshold", | |
yaxis_title="Count of good reviews", | |
# legend_title="Legend Title", | |
font=dict( | |
# family="Courier New, monospace", | |
size=14, | |
color="#006064" | |
), | |
) | |
self.placeholder['c2plot'].plotly_chart(fig, use_container_width=True) | |
def get_subset_condition(data, story_id): | |
return (data.Story == story_id) & (data.Turn == 'user') | |
def get_criteria_review(c, review, neutral_emotion=False): | |
# printj.green(f'{c} {type(c)}') | |
# printj.green(f'{review} {type(review)}') | |
review_bool = True if (review == 'o' or review == None) else False if (c == False and review == 'x') else None | |
if neutral_emotion and review_bool: | |
result = True | |
else: | |
result = (c and review_bool) or (not c and not review_bool) | |
return np.round(int(result), 0) | |
# return str(np.round(result, 0)) | |
def get_ngram_pattern(self, s, n=2): | |
gnp = '' | |
for i in range(len(s)-(n-1)): | |
gnp += '1' if '1' in s[i:i+n] else '0' | |
return gnp | |
def update_df(self): | |
list_stories = self.df.Story.unique() | |
total_num_stories = len(list_stories) | |
num_stories2show = int(set_input(self.container_param, | |
label='No. of stories to show', min_value=1, max_value=total_num_stories, value=9, step=1, | |
key_slider='num_stories2show_slider', key_input='num_stories2show_input',)) | |
list_stories2show = list_stories[:num_stories2show] | |
reaction_weight = set_input(self.container_param, | |
label='Reaction Weight w', min_value=0.0, max_value=1.0, value=0.5, step=0.01, | |
key_slider='w_slider', key_input='w_input',) | |
self.container_param_rv = self.container_param.columns([1, 1]) | |
random_value_mode = self.container_param_rv[0].radio( | |
"C1 Threshold type", ["Random", "Fixed"], index=1) | |
# random_value = random.random() | |
if random_value_mode == "Fixed": | |
random_value = set_input(self.container_param, | |
label='C1 Threshold', | |
key_slider='rand_slider', key_input='rand_input', | |
min_value=0., | |
max_value=1., | |
value=.5, | |
step=.01,) | |
c2_threshold = set_input(self.container_param, | |
label='C2 Threshold', min_value=0.0, max_value=1.0, value=0.7, step=0.01, | |
key_slider='c2_threshold_slider', key_input='c2_threshold_input',) | |
table_mode = self.container_param.radio( | |
"Table Style:", ["Dataframe", "Table"]) | |
self.show_pe_data = self.container_param.checkbox( | |
'Show Probability Emote', value=True, key='show_pe_data_log') | |
self.score_threshold = set_input(self.container_param, | |
label='Score Threshold', min_value=0.0, max_value=1.0, value=0.5, step=0.01, | |
key_slider='score_threshold_slider', key_input='score_threshold_input',) | |
df_reaction_pattern = pd.DataFrame() | |
reaction_pattern_dict = dict() | |
for story_id in list_stories2show: | |
reaction_num = 0 | |
reaction_frequency = 0 | |
probability_emote = 0 | |
# random_value = 0 | |
reaction_show = False | |
# c2 = True | |
subset_condition = self.get_subset_condition(self.df, story_id) | |
dfs = self.df[subset_condition] | |
for i, (index, row) in enumerate(dfs.iterrows()): | |
if row.Emotion == 'neutral' or row.Score < self.score_threshold: | |
reaction_show = False | |
else: | |
reaction_frequency = reaction_num/(i+1) | |
probability_emote = row.Score*reaction_weight + \ | |
(1-reaction_weight)*(1-reaction_frequency) | |
if random_value_mode == "Random": | |
random_value = random.random() | |
reaction_show = True if probability_emote > random_value else False | |
if reaction_show: | |
reaction_num += 1 | |
self.df.at[index, 'reaction_frequency'] = reaction_frequency | |
self.df.at[index, 'probability_emote'] = probability_emote | |
self.df.at[index, 'random_value'] = random_value | |
self.df.at[index, 'reaction_show'] = reaction_show | |
self.df.at[index, 'c1'] = reaction_show | |
c2 = row.Emotion != 'neutral' and row.Score > c2_threshold | |
self.df.at[index, 'c2'] = c2 | |
review = self.df.e_review[index] | |
self.df.at[index, 'c1r'] = self.get_criteria_review( | |
reaction_show, review=review, neutral_emotion=row.Emotion == 'neutral') | |
self.df.at[index, 'c2r'] = self.get_criteria_review( | |
c2, review=review, neutral_emotion=row.Emotion == 'neutral') | |
s = '' | |
df_edit = self.df[self.get_subset_condition( | |
self.df, story_id)].reaction_show.copy() | |
df_edit = df_edit.dropna() | |
for v in df_edit: | |
s += str(int(v)) | |
# df_reaction_pattern.at[story_id] | |
# reaction_pattern_dict['story_id']=story_id | |
reaction_pattern_dict['reaction_length'] = len(s) | |
reaction_pattern_dict['reaction_1'] = s.count('1') | |
reaction_pattern_dict['reaction_pattern'] = s | |
for i in range(2, 8): | |
reaction_pattern_dict[f'{i}-gram_pattern'] = self.get_ngram_pattern( | |
s, n=i) | |
df_reaction_pattern = pd.concat( | |
[df_reaction_pattern, pd.DataFrame(reaction_pattern_dict, index=[f'Story_{story_id}'])]) | |
# st.markdown(df_edit) | |
# st.markdown(s) | |
# for c in ['c1r', 'c2r']: | |
# st.markdown(f'Sum of {c} : {self.df[c].sum()}') | |
df_show = self.df.copy() | |
for c in ['c1r', 'c2r']: | |
df_show[c] = df_show[c].fillna(0).astype(int) | |
st.markdown(f'Sum of {c} : {df_show[c].sum()}') | |
for story_id in list_stories2show: | |
dfs = df_show[(df_show.Story == story_id)].copy() | |
columns2hide = ['Unnamed: 0', 'Story', ] | |
if not self.debug: | |
columns2hide += ['e_review'] | |
if self.emotion_type == 'Max-only': | |
columns2hide += [ | |
f'Emotion_{sorted_i+1}' for sorted_i in range(7)] | |
columns2hide += [ | |
f'Score_{sorted_i+1}' for sorted_i in range(7)] | |
if not self.show_pe_data: | |
columns2hide += [ | |
"reaction_frequency", "probability_emote", "random_value", "reaction_show"] | |
for c in columns2hide: | |
dfs.drop(c, axis=1, inplace=True) | |
st.markdown(f'#### Story {story_id}') | |
dfs = dfs.style | |
if self.show_pe_data: | |
dfs = dfs.apply(self.dfstyle_color_text_col, axis=1) | |
# dfs = dfs.applymap(self.dfstyle_color_text) | |
dfs = dfs.apply(self.rower, axis=None) | |
dfs = dfs.set_table_styles([{ | |
'selector': 'tr:hover', | |
'props': 'color: #000000' # background-color: #eeee66;font-size: 1.01em; | |
}]) # .hide_index() | |
if table_mode == 'Dataframe': | |
st.dataframe(dfs) | |
# set_na_rep(" ").s | |
# st.dataframe(df_reaction_pattern.iloc[story_id-1]) | |
elif table_mode == 'Table': | |
st.table(dfs) | |
# st.table(df_reaction_pattern.iloc[story_id-1]) | |
create_dowload_button( | |
dfs, sheet_name=f'story_{story_id}', file_name=f'data_story_{story_id}.xlsx') | |
# print(dfs.render()) | |
if table_mode == 'Dataframe': | |
st.dataframe(df_reaction_pattern) | |
elif table_mode == 'Table': | |
st.table(df_reaction_pattern) | |
# @st.cache | |
def dfstyle_color_text_col(self, s): | |
num_col = len(s) | |
result = ['background-color: white']*len(s) | |
# if s.Emotion == 'neutral' and s.Turn == 'user': | |
# result[-6:-1] = ['color: #992222'] + \ | |
# ['color: #333333']+['color: #fcfcfc']*3 | |
for si, sc in enumerate(s): | |
if sc != sc: | |
result[si] = 'color: #fcfcfc' | |
# printj.red.bold_on_white(s) | |
# printj.red.bold_on_cyan(si) | |
# printj.red.bold_on_cyan(sc) | |
# if s.Score < self.score_threshold and s.Turn == 'user': | |
# result[-5:-1] = ['color: #992222'] + ['color: #fcfcfc']*3 | |
# printj.red(result) | |
# printj.red.bold_on_cyan(s) | |
# printj.red.bold_on_cyan(type(s)) | |
# printj.red.bold_on_white(s.keys().tolist()) | |
# printj.red.bold_on_white(type(s.keys().tolist())) | |
# idx_reaction_show = s.keys().tolist().index("reaction_show") | |
# printj.red.bold_on_white(idx_reaction_show) | |
# if s.reaction_show == 1: | |
# # result[idx_reaction_show] = 'color: #222222' | |
# pass | |
# elif s.reaction_show == 0: | |
# # result[idx_reaction_show] = 'color: #222222' | |
# pass | |
# else: | |
# # print(s.reaction_show) | |
# # print(type(s.reaction_show)) | |
# hide_length = 3 | |
# result[idx_reaction_show-hide_length:] = ['color: #fcfcfc']*(num_col-idx_reaction_show+hide_length) | |
# if s.probability_emote!=s.probability_emote: | |
# result[5] = 'color: #eeeeee' | |
return result | |
# @staticmethod | |
# @st.cache | |
# def dfstyle_color_text(val): | |
# if type(val)==str: | |
# color = 'red' if val =='neutral' else 'black' | |
# # elif type(val)==float: | |
# # color = 'red' if val > .50000 else 'black' | |
# elif val==None: | |
# color = '#ffffff' | |
# else: | |
# color = None | |
# return 'color: %s' % color if color is not None else '' | |
def rower(data): | |
s = data.index % 2 != 0 | |
s = pd.concat([pd.Series(s)] * data.shape[1], | |
axis=1) | |
z = pd.DataFrame(np.where(s, 'background-color:#f9f9f9', ''), | |
index=data.index, columns=data.columns) | |
return z | |
def get_log(self): | |
df = pd.DataFrame(data=[], columns=[]) | |
log_dict = dict() | |
with open(self.path) as f: | |
lines = f.readlines() | |
self.gen.initialise_classifier_model() | |
story_num = 0 | |
for i, line in enumerate(lines): | |
if line.startswith('H:'): | |
log_dict['Turn'] = 'haru' | |
elif line.startswith('U:'): | |
log_dict['Turn'] = 'user' | |
else: | |
story_num += 1 | |
continue | |
log_dict['Sentence'] = line[3:] | |
log_dict['Story'] = story_num | |
emotion_type = 'sorted' # 'max' | |
if self.emotion_type == 'max': | |
emotion_type = 'max' | |
else: | |
emotion_type = 'sorted' # | |
emotion = self.gen.get_emotion( | |
log_dict['Sentence'], filter_by=emotion_type) | |
if emotion_type == 'max': | |
log_dict['Emotion'] = emotion['label'] | |
log_dict['Score'] = emotion['score'] | |
elif emotion_type == 'sorted': | |
for sorted_i in range(len(emotion)): | |
log_dict[f'Emotion_{sorted_i+1}'] = emotion[sorted_i]['label'] | |
log_dict[f'Score_{sorted_i+1}'] = emotion[sorted_i]['score'] | |
log_dict['Emotion'] = emotion[0]['label'] | |
log_dict['Score'] = emotion[0]['score'] | |
log_dict['e_review'] = ' ' | |
df = pd.concat( | |
[df, pd.DataFrame(log_dict, index=[f'idx_{i}'])]) | |
df = df.reset_index(drop=True) | |
df.to_csv(self.df_path) | |
return df | |
def display_logs(gen, container_guide, container_param, container_button): | |
la = LogAnalyser(gen, container_guide, container_param, container_button) | |
la.display_logs() | |
# df = la.update_df(la.df) | |
if __name__ == '__main__': | |
# df = LogAnalyser.get_log(path='data/ist_logs.txt') | |
# initialize data of lists. | |
# data = {'Name': ['Tom', 'nick', 'krish', 'jack'], | |
# 'Age': [20, 21, 19, 18]} | |
# # Create DataFrame | |
# df = pd.DataFrame(data) | |
# print(df, type(df)) | |
os.system('./run.sh') | |