from io import StringIO import streamlit as st from smem_token_parser import SMEM_Parser, read_tokens_from_lines from smem_obj import SMEM_Obj, ObjType MIN_COLS = 4 # Set up page config before any other streamlit commands st.set_page_config(page_title="Soar Agent Memory Inspector", layout="wide") st.title("Logic Inspector") def get_smem_root_from_file(smem_file): # tokens = get_smem_tokens_from_local_file(smem_filename) tokens = read_tokens_from_lines(smem_file) if tokens == None: st.error("Error reading file: '"+str(smem_file)+"'") return None parser = SMEM_Parser() parser.parse_file(tokens) return parser.get_context_root() def reset_cols(): st.session_state.col_obj_list = [None if i > 0 else x for i,x in enumerate(st.session_state.col_obj_list)] ## DEFINE THE FILE UPLOADER ELEMENT if "col_obj_list" not in st.session_state: st.session_state["col_obj_list"] = None file_upload_expander = st.expander(label="Select a file to inspect", expanded=(st.session_state.col_obj_list == None)) file = file_upload_expander.file_uploader(" ") if file is not None: if st.session_state.col_obj_list is None: root = get_smem_root_from_file(StringIO(file.getvalue().decode("utf-8"))) if root: st.session_state["col_obj_list"] = [None]*MIN_COLS st.session_state.col_obj_list[0] = root st.experimental_rerun() else: st.session_state["col_obj_list"] = None if st.session_state.col_obj_list is None: st.stop() ## DEFINE THE CONTENT FILTERS @st.cache(show_spinner=False, hash_funcs={SMEM_Obj: id}) def get_filter_features_dict(root_obj): return root_obj.get_referenced_features() # Get the content to filter on features_dict = get_filter_features_dict(st.session_state.col_obj_list[0]) filters_expander = st.expander(label="Filters") filters_cols = filters_expander.columns(min(len(features_dict), 5)) # Show 5 filters per row filters_dict = {} for i,key in enumerate(features_dict): col = filters_cols[i % len(filters_cols)] filters_dict[key] = col.multiselect(label=key, options=["(none)"]+sorted(features_dict[key]), on_change=reset_cols) ## DEFINE THE KNOWLEDGE INSPECTOR COLUMNS def add_col(index, obj): st.session_state.col_obj_list = st.session_state.col_obj_list[:index+1] st.session_state.col_obj_list.append(obj) # st.session_state.current_tab_index = index+1 st.experimental_rerun() def get_header_str(obj_type): if obj_type == ObjType.CONTEXT: return "START" elif obj_type == ObjType.COND_CONJ: return "CONDITION" elif obj_type == ObjType.OP: return "CHOICE" elif obj_type == ObjType.ACT_GROUP: return "RESULT" else: return " " def get_tested_wmes_from_obj_str(obj_str): attr_list = [] val_list = [] clauses = str(obj_str).split(" AND ") for clause in clauses: attr,val = clause.replace("(","").replace(")","").split(" is ", maxsplit=1) attr_list += [attr] val_list += [val] return attr_list, val_list st.subheader("Click the buttons below to select conditions and to inspect resulting actions.") cols = st.columns(max(MIN_COLS,len(st.session_state.col_obj_list))) # Iteratively build the columns of navigable knowlege elements for i,col in enumerate(cols): try: if st.session_state.col_obj_list[i] == None: break except Exception as e: # print("ERROR checking column "+str(i+1)+" of "+str(len(st.session_state.col_obj_list))+": "+str(e)) break # Build the available objects to navigate under this col's object obj = st.session_state.col_obj_list[i] obj_str,obj_label,obj_desc = obj.to_string() sub_objs = list(obj.get_child_objects(compact=True)) sub_objs.sort(key=lambda x:x.to_string()[1]) # print(obj.obj_type) if len(sub_objs) > 0: col.markdown("**"+get_header_str(sub_objs[0].obj_type)+"**",) col.markdown("---") # Show the object's main title and description col.text(obj_str) if obj_desc != None: col.markdown("*"+obj_desc+"*") # Print the child objects of this object as the items in this column for j,sub in enumerate(sub_objs): sub_str,sub_label,sub_desc = sub.to_string() if sub_desc != None: button_text = sub_desc else: button_text = sub_label # Check filters if sub.obj_type == ObjType.COND_PRIM or sub.obj_type == ObjType.COND_CONJ: # Get the feature and value for this sub obj keep = True attr_list, val_list = get_tested_wmes_from_obj_str(sub_desc) # Check each filter key for a match for attr in filters_dict: if attr not in attr_list: # Filter doesn't apply for this object continue if val_list[attr_list.index(attr)] not in filters_dict[attr] and len(filters_dict[attr]) > 0: # Value not present keep = False break if not keep: continue if sub.obj_type == ObjType.OP: subsub = list(sub.get_child_objects(compact=True))[0] _,group_string,_ = subsub.to_string() group_string = "\n * "+str(group_string).replace("AND", "\n * ") col.markdown(group_string) if col.button(button_text, key="button"+str(i)+"-"+str(j)): add_col(i,sub) elif sub.obj_type == ObjType.ACT_GROUP: col.markdown("*Output Details:* ") col.markdown("\n * "+str(sub_str).replace(") AND", ")\n * ")) elif col.button(button_text, key="button"+str(i)+"-"+str(j)): add_col(i,sub)