|
import os |
|
import logging |
|
import numpy as np |
|
import streamlit as st |
|
import pandas as pd |
|
import pingouin as pg |
|
import sentence_transformers |
|
from sentence_transformers import SentenceTransformer, util |
|
|
|
import utils |
|
|
|
SYNTHETIC_CONST = -0.00599 |
|
|
|
def load_model(): |
|
|
|
if st.session_state.get('model') is None: |
|
with st.spinner('Loading the model might take a couple of seconds...'): |
|
|
|
if os.environ.get('remote_model_path'): |
|
model_path = os.environ.get('remote_model_path') |
|
else: |
|
model_path = os.getenv('model_path') |
|
|
|
auth_token = os.environ.get('read_models') or True |
|
|
|
st.session_state.model = SentenceTransformer( |
|
model_name_or_path=model_path, |
|
use_auth_token=auth_token |
|
) |
|
|
|
logging.info('Loaded SurveyBot3000!') |
|
|
|
def process_yaml_input(yaml_dict): |
|
|
|
input_data = pd.DataFrame({k: pd.Series(v) for k, v in yaml_dict.items()}) |
|
df = ( |
|
input_data |
|
.stack() |
|
.reset_index() |
|
.drop('level_0', axis=1) |
|
.rename(columns={'level_1': 'scale', 0: "item"}) |
|
) |
|
df['item'] = df['item'].apply(lambda item: utils.clean_text(item)) |
|
|
|
return(df) |
|
|
|
def get_items_per_scale(): |
|
input_data = st.session_state.input_data |
|
items_per_scale = input_data.groupby('scale').size().tolist() |
|
return(items_per_scale) |
|
|
|
def get_sign(df): |
|
sign = ( |
|
df |
|
.T |
|
.iloc[1:,:] |
|
.corr() |
|
.iloc[:,0] |
|
.apply(lambda x: np.sign(x + SYNTHETIC_CONST)) |
|
) |
|
return(sign) |
|
|
|
def encode_input_data(): |
|
|
|
with st.spinner('Encoding items...'): |
|
input_data = st.session_state.input_data |
|
input_data['embeddings'] = input_data.item.apply(lambda x: st.session_state.model.encode( |
|
sentences=x, |
|
convert_to_numpy=True |
|
)) |
|
|
|
return(input_data) |
|
|
|
def get_synthetic_item_correlations(): |
|
|
|
df = pd.DataFrame( |
|
data = util.cos_sim( |
|
a=st.session_state.input_data.embeddings, |
|
b=st.session_state.input_data.embeddings |
|
), |
|
columns=st.session_state.input_data.item, |
|
index=st.session_state.input_data.item |
|
).round(2) |
|
|
|
if st.session_state.results_as_matrix is False: |
|
df = ( |
|
df |
|
.reset_index() |
|
.melt(id_vars=['item'], var_name='item_b', value_name='Θ') |
|
.rename(columns={'item': 'item_a'}) |
|
.query('item_a < item_b') |
|
) |
|
|
|
return(df) |
|
|
|
def get_synthetic_scale_correlations(): |
|
|
|
scales = st.session_state.input_data.scale |
|
embeddings = st.session_state.input_data.embeddings.apply(pd.Series) |
|
|
|
def average_scale_embeddings(group_data): |
|
|
|
group_embeddings = group_data.iloc[:,1:] |
|
|
|
is_reversed = get_sign(group_data) == -1 |
|
max_value = group_embeddings.max(axis=None) |
|
group_embeddings[is_reversed] = max_value - group_embeddings[is_reversed] |
|
|
|
mean_group_embeddings = group_embeddings.T.mean(axis=1) |
|
|
|
return(mean_group_embeddings) |
|
|
|
df = ( |
|
pd |
|
.concat([scales, embeddings], axis=1) |
|
.groupby('scale') |
|
.apply(lambda group: average_scale_embeddings(group)) |
|
.T |
|
.corr() |
|
.round(2) |
|
) |
|
|
|
if st.session_state.results_as_matrix is False: |
|
df = ( |
|
df |
|
.reset_index() |
|
.melt(id_vars='scale', var_name='scale_b', value_name='Θ') |
|
.rename(columns={'scale': 'scale_a'}) |
|
.query('scale_a < scale_b') |
|
) |
|
|
|
return(df) |
|
|
|
def get_synthetic_reliabilities(): |
|
|
|
scales = st.session_state.input_data.scale |
|
embeddings = st.session_state.input_data.embeddings.apply(pd.Series) |
|
|
|
|
|
def get_reliability_by_group(group_data): |
|
|
|
group_embeddings = group_data.iloc[:,1:] |
|
|
|
is_reversed = get_sign(group_data) == -1 |
|
max_value = group_embeddings.max(axis=None) |
|
group_embeddings[is_reversed] = max_value - group_embeddings[is_reversed] |
|
|
|
alpha_zip = pg.cronbach_alpha(data=group_embeddings.T) |
|
alpha = [alpha_zip[0], alpha_zip[1][0], alpha_zip[1][1]] |
|
|
|
return(alpha) |
|
|
|
data = ( |
|
pd |
|
.concat([scales, embeddings], axis=1) |
|
.groupby('scale') |
|
.apply(lambda group: get_reliability_by_group(group)) |
|
) |
|
|
|
df = pd.DataFrame( |
|
data=[[v] + data.tolist()[k] for k, v in enumerate(data.index.tolist())], |
|
columns=['scale', 'alpha (Θ)', 'ci_lower', 'ci_upper'] |
|
).round(2) |
|
|
|
return(df) |
|
|