File size: 5,495 Bytes
786340e 2ca6dd2 786340e 2ca6dd2 786340e 2ca6dd2 786340e 2ca6dd2 786340e 2ca6dd2 786340e 2ca6dd2 786340e 2ca6dd2 786340e 2ca6dd2 786340e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 |
from io import StringIO
import streamlit as st
from smem_token_parser import SMEM_Parser, read_tokens_from_lines
from smem_obj import SMEM_Obj, ObjType
MIN_COLS = 4
# Set up page config before any other streamlit commands
st.set_page_config(page_title="Soar Agent Memory Inspector", layout="wide")
st.title("Logic Inspector")
def get_smem_root_from_file(smem_file):
# tokens = get_smem_tokens_from_local_file(smem_filename)
tokens = read_tokens_from_lines(smem_file)
if tokens == None:
st.error("Error reading file: '"+str(smem_file)+"'")
return None
parser = SMEM_Parser()
parser.parse_file(tokens)
return parser.get_context_root()
## DEFINE THE FILE UPLOADER ELEMENT
if "col_obj_list" not in st.session_state:
st.session_state["col_obj_list"] = None
file_upload_expander = st.expander(label="Select a file to inspect", expanded=(st.session_state.col_obj_list == None))
file = file_upload_expander.file_uploader(" ")
if file is not None:
if st.session_state.col_obj_list is None:
root = get_smem_root_from_file(StringIO(file.getvalue().decode("utf-8")))
if root:
st.session_state["col_obj_list"] = [None]*MIN_COLS
st.session_state.col_obj_list[0] = root
st.experimental_rerun()
else:
st.session_state["col_obj_list"] = None
if st.session_state.col_obj_list is None:
st.stop()
## DEFINE THE CONTENT FILTERS
@st.cache(show_spinner=False, hash_funcs={SMEM_Obj: id})
def get_filter_features_dict(root_obj):
return root_obj.get_referenced_features()
# Get the content to filter on
features_dict = get_filter_features_dict(st.session_state.col_obj_list[0])
filters_expander = st.expander(label="Filters")
filters_cols = filters_expander.columns(len(features_dict))
filters_dict = {}
for key, col in zip(features_dict, filters_cols):
filters_dict[key] = col.multiselect(label=key, options=["(none)"]+sorted(features_dict[key]))
## DEFINE THE KNOWLEDGE INSPECTOR COLUMNS
def add_col(index, obj):
st.session_state.col_obj_list = st.session_state.col_obj_list[:index+1]
st.session_state.col_obj_list.append(obj)
# st.session_state.current_tab_index = index+1
st.experimental_rerun()
def get_header_str(obj_type):
if obj_type == ObjType.CONTEXT:
return "START"
elif obj_type == ObjType.COND_CONJ:
return "CONDITION"
elif obj_type == ObjType.OP:
return "CHOICE"
elif obj_type == ObjType.ACT_GROUP:
return "RESULT"
else:
return " "
def get_tested_wmes_from_obj_str(obj_str):
attr_list = []
val_list = []
clauses = str(obj_str).split(" AND ")
for clause in clauses:
attr,val = clause.replace("(","").replace(")","").split(" is ", maxsplit=1)
attr_list += [attr]
val_list += [val]
return attr_list, val_list
st.subheader("Click the buttons below to select conditions and to inspect resulting actions.")
cols = st.columns(max(MIN_COLS,len(st.session_state.col_obj_list)))
# Iteratively build the columns of navigable knowlege elements
for i,col in enumerate(cols):
try:
if st.session_state.col_obj_list[i] == None:
break
except Exception as e:
# print("ERROR checking column "+str(i+1)+" of "+str(len(st.session_state.col_obj_list))+": "+str(e))
break
# Build the available objects to navigate under this col's object
obj = st.session_state.col_obj_list[i]
obj_str,obj_label,obj_desc = obj.to_string()
sub_objs = list(obj.get_child_objects(compact=True))
# print(obj.obj_type)
if len(sub_objs) > 0:
col.markdown("**"+get_header_str(sub_objs[0].obj_type)+"**",)
col.markdown("---")
# Show the object's main title and description
col.text(obj_str)
if obj_desc != None:
col.markdown("*"+obj_desc+"*")
# Print the child objects of this object as the items in this column
for j,sub in enumerate(sub_objs):
sub_str,sub_label,sub_desc = sub.to_string()
if sub_desc != None:
button_text = sub_desc
else:
button_text = sub_label
# Check filters
if sub.obj_type == ObjType.COND_PRIM or sub.obj_type == ObjType.COND_CONJ:
# Get the feature and value for this sub obj
keep = True
attr_list, val_list = get_tested_wmes_from_obj_str(sub_desc)
# Check each filter key for a match
for attr in filters_dict:
if attr not in attr_list:
# Filter doesn't apply for this object
continue
if val_list[attr_list.index(attr)] not in filters_dict[attr] and len(filters_dict[attr]) > 0:
# Value not present
keep = False
break
if not keep:
continue
if sub.obj_type == ObjType.OP:
subsub = list(sub.get_child_objects(compact=True))[0]
_,group_string,_ = subsub.to_string()
group_string = "\n * "+str(group_string).replace("AND", "\n * ")
col.markdown(group_string)
if col.button(button_text, key="button"+str(i)+"-"+str(j)):
add_col(i,sub)
elif sub.obj_type == ObjType.ACT_GROUP:
col.markdown("*Output Details:* ")
col.markdown("\n * "+str(sub_str).replace(") AND", ")\n * "))
elif col.button(button_text, key="button"+str(i)+"-"+str(j)):
add_col(i,sub)
|