import streamlit as st import pathlib import json import pandas as pd st. set_page_config(layout="wide") st.header("Time Series Preprocessing Pipeline") st.markdown("Users can load their time-series data and select a set of transformations to prepare a training set for univariate or multivariate time-series classification.\ Go ahead and use the sidebar on the left to upload your data files in *.json* format and start exploring and transforming it!") col1, col2 = st.columns(2) @st.experimental_memo def convert_df(df): return df.to_csv(index=False).encode('utf-8') # Load a prepare data file_names, file_bytes = [], [] with st.sidebar: files = st.file_uploader("Load files", accept_multiple_files = True) if files: file_names = [file.name for file in files] file_bytes = [file.getvalue() for file in files] st.success("Your data has been successfully loaded! ๐Ÿค—") data_dict = dict({'trial_id':[], 'pupil_dilation':[], 'baseline':[], 'rating':[]}) with st.spinner("Building base dictionary..."): for file_data in file_bytes: data = json.loads(file_data) for k in data: for i in data[k]: for k, v in i.items(): data_dict[k].append(v) df_base = pd.DataFrame() # {'' : []}) with col1: if file_bytes: with st.spinner("Building base dataframe..."): df_base = pd.DataFrame.from_dict(data_dict) df_base["trial_id"] = df_base.trial_id.map(lambda s: "".join([c for c in s if c.isdigit()])) df_base["len_pupil_dilation"] = df_base.pupil_dilation.map(lambda l: len(l)) df_base["len_baseline"] = df_base.baseline.map(lambda l: len(l)) st.info(f"number of files: {len(file_names)}") if 'df_base' not in st.session_state: st.session_state['df_base'] = df_base else: st.caption("Upload your data using the sidebar to start :sunglasses:") if 'df_base' in st.session_state: st.markdown("Your original data with some extra information about the length of the time-series fields") st.dataframe(st.session_state.df_base) # Cleaning starts with col1: if not df_base.empty: st.markdown("**Cleaning actions**") detect_blinking = st.button("I want to clean my data ๐Ÿค—") number_of_blinks = 0 if detect_blinking: # Initialization of session_state if 'df' not in st.session_state: st.session_state['df'] = df_base for ser in df_base['pupil_dilation']: for f in ser: if f == 0.0: number_of_blinks += 1 for ser in df_base['baseline']: for f in ser: if f == 0.0: number_of_blinks += 1 # Initialization of session_state if 'blinks' not in st.session_state: st.session_state['blinks'] = number_of_blinks if "blinks" in st.session_state.keys(): st.info(f"blinking values (0.0) were found in {number_of_blinks} time-steps in all your data") remove_blinking = st.button("Remove blinking ๐Ÿงน") # df in column 2 if remove_blinking: df_right = st.session_state.df.copy(deep=True) df_right.pupil_dilation = df_right.pupil_dilation.map(lambda ser: [f for f in ser if f != 0.0]) df_right.baseline = df_right.baseline.map(lambda ser: [f for f in ser if f != 0.0]) st.session_state['df'] = df_right.copy(deep=True) st.success("Blinking values have been removed!") st.session_state.df_base = df_right elif detect_blinking and not number_of_blinks: st.caption("No blinking values were found in your data! ") # Add calculated fields if 'df' in st.session_state or 'df_right' in st.session_state: df_right = st.session_state.df.copy(deep=True) if "baseline" in list(df_right.keys()): st.markdown(f"A **baseline** feature has been found on your data, do you want to merge it with any of the other features in a new calculated field?") option = st.multiselect('Select a feature to create relative calculated feature โž•', [k for k in list(df_right.keys()) if k != 'baseline'], [[k for k in list(df_right.keys()) if k != 'baseline'][-4]]) relative_key = f"relative_{option[0]}" add_relative = st.button(f"Add {relative_key}") if add_relative: baseline_mean = [sum(s)/len(s) for s in df_right['baseline']] df_right[relative_key] = [[field_value - baseline_mean[i] for field_value in df_right[option[0]][i]] for i in range(len(df_right))] st.markdown("After adding calculated fields and removing blinking values (when applied)") st.dataframe(df_right) csv = convert_df(df_right) if 'df_right' not in st.session_state: st.session_state['df_right'] = df_right # Save transformations to disk downl = st.download_button("Download CSV ๐Ÿ’พ", csv, "file.csv", "text/csv", key='download-csv') if downl: st.info("Your data has been downloaded, you can visualize and detect outliers in the 'Plotting' and 'Detect Outliers' pages on the sidebar.") if not df_base.empty: with col1: st.warning("Consider running outlier detection to clean your data!", icon="โš ๏ธ")