sna_g34 / app /files.py
yashpulse's picture
Final
7861c5d
import streamlit as st
import streamlit_antd_components as sac
import pandas as pd
from st_aggrid import AgGrid
import time
import json
def files():
st.title('Files')
col = st.columns([4, 1])
with col[0]:
st.subheader("Below are the files that are uploaded for current session.")
with st.expander('Upload', False):
uploaded_file = st.file_uploader("Choose a file", type=['xlsx'], accept_multiple_files=False)
if uploaded_file is not None:
st.session_state.sna.add_file(uploaded_file)
st.info("File uploaded successfully.")
uploaded_file = None
with st.expander('Files', True):
files = st.session_state.sna.get_files()
file_tree_items = []
for file in files:
parts = file.split('/')
file_tree_items.append(sac.TreeItem(parts[-1]))
selected_files = sac.tree(items=file_tree_items,checkbox=True)
raw_selected_files = None
if len(selected_files) == 1:
for file in selected_files:
for raw_file in files:
if raw_file.endswith(file):
raw_selected_files = raw_file
st.write("Selected file: ",raw_file)
if raw_selected_files is not None:
if str(raw_selected_files).endswith(".json"):
with st.expander('View', False):
with open(raw_selected_files) as f:
data = json.load(f)
st.json(data)
else:
with st.expander('View', False):
all_sheets = pd.read_excel(raw_selected_files,sheet_name=None, engine='openpyxl')
tab_items = []
for sheet_name, df in all_sheets.items():
tab_items.append(sac.TabsItem(label=sheet_name))
tab = sac.tabs(tab_items,key="view",index=None)
if tab is not None:
for sheet_name, df in all_sheets.items():
if sheet_name == tab:
st.dataframe(df)
break
with st.expander('Load', True):
data_loading_response = None
generate_helper = st.session_state.sna.generate_data_dictionary(raw_selected_files)
generate_helper_reset = generate_helper.copy()
load_action_controls = st.columns([1,1,1,1,1,1,1,1,1,1,1,1,1])
with load_action_controls[-3]:
if st.button("Save",type="primary"):
st.rerun()
with load_action_controls[-2]:
if st.button("Reset",type="primary"):
st.session_state[raw_selected_files] = generate_helper_reset.copy()
generate_helper = generate_helper_reset.copy()
st.rerun()
with load_action_controls[-1]:
if st.button("Load",type="primary"):
print("Loading")
load_response = st.session_state.sna.load_data(raw_selected_files,st.session_state[raw_selected_files])
data_loading_response = load_response
if data_loading_response is not None:
container_message = st.empty()
if data_loading_response["status"]:
container_message.success(data_loading_response["message"])
time.sleep(3)
container_message.empty()
else:
container_message.error(data_loading_response["message"])
time.sleep(3)
container_message.empty()
if raw_selected_files in st.session_state:
generate_helper = st.session_state[raw_selected_files]
else:
st.session_state[raw_selected_files] = generate_helper.copy()
tab_items_load = []
for sheet_name in generate_helper:
tab_items_load.append(sac.TabsItem(label=sheet_name))
tab_load = sac.tabs(tab_items_load,key='load')
if tab_load is not None:
for sheet_name in generate_helper:
if sheet_name == tab_load:
loading_columns = st.columns([1,1,1])
with loading_columns[0]:
if generate_helper[sheet_name]["isTable"]:
st.write("Columns")
if "columns" in generate_helper[sheet_name]:
for column in generate_helper[sheet_name]["columns"]:
data_type_stg = 0
if generate_helper[sheet_name]["columns"][column] == "STRING":
data_type_stg = 0
elif generate_helper[sheet_name]["columns"][column] == "INT64":
data_type_stg = 1
elif generate_helper[sheet_name]["columns"][column] == "FLOAT":
data_type_stg = 2
value = st.radio(column,["STRING","INT64","FLOAT"],index=data_type_stg,horizontal=True,key=f"{sheet_name}_{column}")
if value is not None:
generate_helper[sheet_name]["columns"][column] = value
st.session_state[raw_selected_files] = generate_helper.copy()
elif generate_helper[sheet_name]["isRelationship"]:
tables = []
for table in generate_helper:
if "isTable" in generate_helper[table]:
if generate_helper[table]["isTable"]:
tables.append(table)
st.write("Relationship")
source_stg = 0
target_stg = 0
if "source" in generate_helper[sheet_name]:
if generate_helper[sheet_name]["source"] in tables:
source_stg = tables.index(generate_helper[sheet_name]["source"])
if "target" in generate_helper[sheet_name]:
if generate_helper[sheet_name]["target"] in tables:
target_stg = tables.index(generate_helper[sheet_name]["target"])
source = st.selectbox("Source",tables,index=source_stg,key=f"source_{sheet_name}")
target = st.selectbox("Target",tables,index=target_stg,key=f"target_{sheet_name}")
if source is not None:
generate_helper[sheet_name]["source"] = source
st.session_state[raw_selected_files] = generate_helper.copy()
if target is not None:
generate_helper[sheet_name]["target"] = target
st.session_state[raw_selected_files] = generate_helper.copy()
else:
st.write("Ignored")
with loading_columns[1]:
st.write("Object Properties")
if generate_helper[sheet_name]["isTable"]:
object_type_stg = 0
elif generate_helper[sheet_name]["isRelationship"]:
object_type_stg = 1
else:
object_type_stg = 2
object_type = st.radio("Object Type",["Table","Relationship","Ignore"],index=object_type_stg,horizontal=True,key=f"object_type_{sheet_name}")
if object_type == "Table":
if not generate_helper[sheet_name]["isTable"]:
generate_helper[sheet_name]["isTable"] = True
generate_helper[sheet_name]["isRelationship"] = False
st.session_state[raw_selected_files] = generate_helper.copy()
st.rerun()
elif object_type == "Relationship":
if not generate_helper[sheet_name]["isRelationship"]:
generate_helper[sheet_name]["isTable"] = False
generate_helper[sheet_name]["isRelationship"] = True
st.session_state[raw_selected_files] = generate_helper.copy()
st.rerun()
else:
if generate_helper[sheet_name]["isTable"] or generate_helper[sheet_name]["isRelationship"]:
generate_helper[sheet_name]["isTable"] = False
generate_helper[sheet_name]["isRelationship"] = False
st.session_state[raw_selected_files] = generate_helper.copy()
st.rerun()
object_name = st.text_input("Object Name",generate_helper[sheet_name]["object_name"])
if object_name is not None:
generate_helper[sheet_name]["object_name"] = object_name
st.session_state[raw_selected_files] = generate_helper.copy()
if generate_helper[sheet_name]["isTable"]:
int_64_keys = []
if "columns" in generate_helper[sheet_name]:
for column in generate_helper[sheet_name]["columns"]:
if generate_helper[sheet_name]["columns"][column] == "INT64":
int_64_keys.append(column)
primary_key_staging = 0
if generate_helper[sheet_name]["primary_key"] in int_64_keys:
primary_key_staging = int_64_keys.index(generate_helper[sheet_name]["primary_key"])
primary_key = st.selectbox("Primary Key",int_64_keys,index=primary_key_staging,key=f"primary_key_{sheet_name}")
if primary_key is not None:
generate_helper[sheet_name]["primary_key"] = primary_key
st.session_state[raw_selected_files] = generate_helper.copy()
with loading_columns[2]:
st.write("Helper Object")
st.write(generate_helper[sheet_name])
break