import os import logging import numpy as np import streamlit as st import pandas as pd import pingouin as pg import sentence_transformers from sentence_transformers import SentenceTransformer, util import utils def load_model(): if st.session_state.get('model') is None: with st.spinner('Loading the model might take a couple of seconds...'): if os.environ.get('remote_model_path'): model_path = os.environ.get('remote_model_path') else: model_path = os.getenv('model_path') auth_token = os.environ.get('read_models') or True st.session_state.model = SentenceTransformer( model_name_or_path=model_path, use_auth_token=auth_token ) logging.info('Loaded SurveyBot3000!') def process_yaml_input(yaml_dict): input_data = pd.DataFrame({k: pd.Series(v) for k, v in yaml_dict.items()}) df = ( input_data .stack() .reset_index() .drop('level_0', axis=1) .rename(columns={'level_1': 'scale', 0: "item"}) ) df['item'] = df['item'].apply(lambda item: utils.clean_text(item)) return(df) def get_items_per_scale(): input_data = st.session_state.input_data items_per_scale = input_data.groupby('scale').size().tolist() return(items_per_scale) def get_sign(df): sign = ( df .T .iloc[1:,:] .corr() .iloc[:,0] .apply(lambda x: np.sign(x)) ) return(sign) def encode_input_data(): with st.spinner('Encoding items...'): input_data = st.session_state.input_data input_data['embeddings'] = input_data.item.apply(lambda x: st.session_state.model.encode( sentences=x, convert_to_numpy=True )) return(input_data) def get_synthetic_item_correlations(): df = pd.DataFrame( data = util.cos_sim( a=st.session_state.input_data.embeddings, b=st.session_state.input_data.embeddings ), columns=st.session_state.input_data.item, index=st.session_state.input_data.item ).round(2) if st.session_state.results_as_matrix is False: df = ( df .reset_index() .melt(id_vars=['item'], var_name='item_b', value_name='Θ') .rename(columns={'item': 'item_a'}) .query('item_a < item_b') ) return(df) def get_synthetic_scale_correlations(): scales = st.session_state.input_data.scale embeddings = st.session_state.input_data.embeddings.apply(pd.Series) def average_scale_embeddings(group_data): group_embeddings = group_data.iloc[:,1:] is_reversed = get_sign(group_data) == -1 max_value = group_embeddings.max(axis=None) group_embeddings[is_reversed] = max_value - group_embeddings[is_reversed] mean_group_embeddings = group_embeddings.T.mean(axis=1) return(mean_group_embeddings) df = ( pd .concat([scales, embeddings], axis=1) .groupby('scale') .apply(lambda group: average_scale_embeddings(group)) .T .corr() .round(2) ) if st.session_state.results_as_matrix is False: df = ( df .reset_index() .melt(id_vars='scale', var_name='scale_b', value_name='Θ') .rename(columns={'scale': 'scale_a'}) .query('scale_a < scale_b') ) return(df) def get_synthetic_reliabilities(): scales = st.session_state.input_data.scale embeddings = st.session_state.input_data.embeddings.apply(pd.Series) def get_reliability_by_group(group_data): group_embeddings = group_data.iloc[:,1:] is_reversed = get_sign(group_data) == -1 max_value = group_embeddings.max(axis=None) group_embeddings[is_reversed] = max_value - group_embeddings[is_reversed] alpha_zip = pg.cronbach_alpha(data=group_embeddings.T) alpha = [alpha_zip[0], alpha_zip[1][0], alpha_zip[1][1]] return(alpha) data = ( pd .concat([scales, embeddings], axis=1) .groupby('scale') .apply(lambda group: get_reliability_by_group(group)) ) df = pd.DataFrame( data=[[v] + data.tolist()[k] for k, v in enumerate(data.index.tolist())], columns=['scale', 'alpha (Θ)', 'ci_lower', 'ci_upper'] ).round(2) return(df)