import base64
import logging
import pathlib
import re
import sys
import uuid
import streamlit as st
import yaml
from obsei.configuration import ObseiConfiguration
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
def img_to_bytes(img_path):
img_bytes = pathlib.Path(img_path).read_bytes()
encoded = base64.b64encode(img_bytes).decode()
return encoded
# Copied from https://github.com/jrieke/traingenerator/blob/main/app/utils.py
def download_button(
object_to_download, download_filename, button_text # , pickle_it=False
):
try:
# some strings <-> bytes conversions necessary here
b64 = base64.b64encode(object_to_download.encode()).decode()
except AttributeError as e:
b64 = base64.b64encode(object_to_download).decode()
button_uuid = str(uuid.uuid4()).replace("-", "")
button_id = re.sub("\d+", "", button_uuid)
custom_css = f"""
"""
dl_link = (
custom_css
+ f'{button_text}
'
)
# dl_link = f'
'
st.markdown(dl_link, unsafe_allow_html=True)
def get_obsei_config(current_path, file_name):
return ObseiConfiguration(
config_path=current_path,
config_filename=file_name,
).configuration
@st.cache
def get_icon_name(name, icon, icon_size=40, font_size=1):
if not name:
return f''
return (
f'
'
f''
f"{name}
"
)
def render_config(config, component, help_str=None, parent_key=None):
if config is None:
return
prefix = "" if parent_key is None else f"{parent_key}."
if help_str is not None:
with component.expander("Info", False):
help_area = "\n".join(help_str)
st.code(f"{help_area}")
for k, v in config.items():
if k == "_target_":
continue
if isinstance(v, dict):
render_config(v, component, None, k)
elif isinstance(v, list):
if len(v) == 0:
continue
is_object = isinstance(v[0], dict)
if is_object:
for idx, sub_element in enumerate(v):
render_config(sub_element, component, None, f"{k}[{idx}]")
else:
text_data = component.text_area(
f"{prefix}{k}", ", ".join(v), help="Comma separated list"
)
text_list = text_data.split(",")
config[k] = [text.strip() for text in text_list]
elif isinstance(v, bool):
options = [True, False]
selected_option = component.radio(f"{prefix}{k}", options, options.index(v))
config[k] = bool(selected_option)
else:
tokens = k.split("_")
is_secret = tokens[-1] in ["key", "password", "token", "secret"]
hint = (
"Enter value"
if "lookup" not in tokens
else "Format: `` d=day, h=hour & m=minute"
)
config[k] = component.text_input(
f"{prefix}{k}",
v,
type="password" if is_secret else "default",
help=hint,
)
def generate_python(generate_config):
return f"""
from obsei.configuration import ObseiConfiguration
# This is Obsei workflow path and filename
config_path = "./"
config_filename = "workflow.yml"
# Extract config via yaml file using `config_path` and `config_filename`
obsei_configuration = ObseiConfiguration(config_path=config_path, config_filename=config_filename)
# Initialize objects using configuration
source_config = obsei_configuration.initialize_instance("source_config")
source = obsei_configuration.initialize_instance("source")
analyzer = obsei_configuration.initialize_instance("analyzer")
analyzer_config = obsei_configuration.initialize_instance("analyzer_config")
sink_config = obsei_configuration.initialize_instance("sink_config")
sink = obsei_configuration.initialize_instance("sink")
# This will fetch information from configured source ie twitter, app store etc
source_response_list = source.lookup(source_config)
# This will execute analyzer (Sentiment, classification etc) on source data with provided analyzer_config
# Analyzer will it's output to `segmented_data` inside `analyzer_response`
analyzer_response_list = analyzer.analyze_input(
source_response_list=source_response_list,
analyzer_config=analyzer_config
)
# This will send analyzed output to configure sink ie Slack, Zendesk etc
sink_response_list = sink.send_data(analyzer_response_list, sink_config)
"""
def generate_yaml(generate_config):
return yaml.dump(generate_config)
def execute_workflow(generate_config, component=None, log_components=None):
progress_show = None
if component:
progress_show = component.empty()
progress_show.code("πππ Processing π’π’π’")
try:
obsei_configuration = ObseiConfiguration(configuration=generate_config)
source_config = obsei_configuration.initialize_instance("source_config")
source = obsei_configuration.initialize_instance("source")
analyzer = obsei_configuration.initialize_instance("analyzer")
analyzer_config = obsei_configuration.initialize_instance("analyzer_config")
sink_config = obsei_configuration.initialize_instance("sink_config")
sink = obsei_configuration.initialize_instance("sink")
source_response_list = source.lookup(source_config)
log_components["source"].write([vars(response) for response in source_response_list])
analyzer_response_list = analyzer.analyze_input(
source_response_list=source_response_list, analyzer_config=analyzer_config
)
log_components["analyzer"].write([vars(response) for response in analyzer_response_list])
sink_response_list = sink.send_data(analyzer_response_list, sink_config)
if sink.TYPE == 'Pandas':
log_components["sink"].write(sink_response_list)
elif sink_response_list is not None:
log_components["sink"].write([vars(response) for response in sink_response_list])
else:
log_components["sink"].write("No Data")
if progress_show:
progress_show.code("πππ Processing Complete!! πΎπΎπΎ")
except Exception as ex:
if progress_show:
progress_show.code(f"βββ Processing Failed!! πππ \n π ({str(ex)})")
raise ex