bryan-stearns
Now resets display when filters change
9df3688
from io import StringIO
import streamlit as st
from smem_token_parser import SMEM_Parser, read_tokens_from_lines
from smem_obj import SMEM_Obj, ObjType
MIN_COLS = 4
# Set up page config before any other streamlit commands
st.set_page_config(page_title="Soar Agent Memory Inspector", layout="wide")
st.title("Logic Inspector")
def get_smem_root_from_file(smem_file):
# tokens = get_smem_tokens_from_local_file(smem_filename)
tokens = read_tokens_from_lines(smem_file)
if tokens == None:
st.error("Error reading file: '"+str(smem_file)+"'")
return None
parser = SMEM_Parser()
parser.parse_file(tokens)
return parser.get_context_root()
def reset_cols():
st.session_state.col_obj_list = [None if i > 0 else x for i,x in enumerate(st.session_state.col_obj_list)]
## DEFINE THE FILE UPLOADER ELEMENT
if "col_obj_list" not in st.session_state:
st.session_state["col_obj_list"] = None
file_upload_expander = st.expander(label="Select a file to inspect", expanded=(st.session_state.col_obj_list == None))
file = file_upload_expander.file_uploader(" ")
if file is not None:
if st.session_state.col_obj_list is None:
root = get_smem_root_from_file(StringIO(file.getvalue().decode("utf-8")))
if root:
st.session_state["col_obj_list"] = [None]*MIN_COLS
st.session_state.col_obj_list[0] = root
st.experimental_rerun()
else:
st.session_state["col_obj_list"] = None
if st.session_state.col_obj_list is None:
st.stop()
## DEFINE THE CONTENT FILTERS
@st.cache(show_spinner=False, hash_funcs={SMEM_Obj: id})
def get_filter_features_dict(root_obj):
return root_obj.get_referenced_features()
# Get the content to filter on
features_dict = get_filter_features_dict(st.session_state.col_obj_list[0])
filters_expander = st.expander(label="Filters")
filters_cols = filters_expander.columns(min(len(features_dict), 5)) # Show 5 filters per row
filters_dict = {}
for i,key in enumerate(features_dict):
col = filters_cols[i % len(filters_cols)]
filters_dict[key] = col.multiselect(label=key, options=["(none)"]+sorted(features_dict[key]), on_change=reset_cols)
## DEFINE THE KNOWLEDGE INSPECTOR COLUMNS
def add_col(index, obj):
st.session_state.col_obj_list = st.session_state.col_obj_list[:index+1]
st.session_state.col_obj_list.append(obj)
# st.session_state.current_tab_index = index+1
st.experimental_rerun()
def get_header_str(obj_type):
if obj_type == ObjType.CONTEXT:
return "START"
elif obj_type == ObjType.COND_CONJ:
return "CONDITION"
elif obj_type == ObjType.OP:
return "CHOICE"
elif obj_type == ObjType.ACT_GROUP:
return "RESULT"
else:
return " "
def get_tested_wmes_from_obj_str(obj_str):
attr_list = []
val_list = []
clauses = str(obj_str).split(" AND ")
for clause in clauses:
attr,val = clause.replace("(","").replace(")","").split(" is ", maxsplit=1)
attr_list += [attr]
val_list += [val]
return attr_list, val_list
st.subheader("Click the buttons below to select conditions and to inspect resulting actions.")
cols = st.columns(max(MIN_COLS,len(st.session_state.col_obj_list)))
# Iteratively build the columns of navigable knowlege elements
for i,col in enumerate(cols):
try:
if st.session_state.col_obj_list[i] == None:
break
except Exception as e:
# print("ERROR checking column "+str(i+1)+" of "+str(len(st.session_state.col_obj_list))+": "+str(e))
break
# Build the available objects to navigate under this col's object
obj = st.session_state.col_obj_list[i]
obj_str,obj_label,obj_desc = obj.to_string()
sub_objs = list(obj.get_child_objects(compact=True))
sub_objs.sort(key=lambda x:x.to_string()[1])
# print(obj.obj_type)
if len(sub_objs) > 0:
col.markdown("**"+get_header_str(sub_objs[0].obj_type)+"**",)
col.markdown("---")
# Show the object's main title and description
col.text(obj_str)
if obj_desc != None:
col.markdown("*"+obj_desc+"*")
# Print the child objects of this object as the items in this column
for j,sub in enumerate(sub_objs):
sub_str,sub_label,sub_desc = sub.to_string()
if sub_desc != None:
button_text = sub_desc
else:
button_text = sub_label
# Check filters
if sub.obj_type == ObjType.COND_PRIM or sub.obj_type == ObjType.COND_CONJ:
# Get the feature and value for this sub obj
keep = True
attr_list, val_list = get_tested_wmes_from_obj_str(sub_desc)
# Check each filter key for a match
for attr in filters_dict:
if attr not in attr_list:
# Filter doesn't apply for this object
continue
if val_list[attr_list.index(attr)] not in filters_dict[attr] and len(filters_dict[attr]) > 0:
# Value not present
keep = False
break
if not keep:
continue
if sub.obj_type == ObjType.OP:
subsub = list(sub.get_child_objects(compact=True))[0]
_,group_string,_ = subsub.to_string()
group_string = "\n * "+str(group_string).replace("AND", "\n * ")
col.markdown(group_string)
if col.button(button_text, key="button"+str(i)+"-"+str(j)):
add_col(i,sub)
elif sub.obj_type == ObjType.ACT_GROUP:
col.markdown("*Output Details:* ")
col.markdown("\n * "+str(sub_str).replace(") AND", ")\n * "))
elif col.button(button_text, key="button"+str(i)+"-"+str(j)):
add_col(i,sub)