import base64 import logging import pathlib import re import sys import uuid import streamlit as st import yaml from obsei.configuration import ObseiConfiguration logger = logging.getLogger(__name__) logging.basicConfig(stream=sys.stdout, level=logging.INFO) def img_to_bytes(img_path): img_bytes = pathlib.Path(img_path).read_bytes() encoded = base64.b64encode(img_bytes).decode() return encoded # Copied from https://github.com/jrieke/traingenerator/blob/main/app/utils.py def download_button( object_to_download, download_filename, button_text # , pickle_it=False ): try: # some strings <-> bytes conversions necessary here b64 = base64.b64encode(object_to_download.encode()).decode() except AttributeError as e: b64 = base64.b64encode(object_to_download).decode() button_uuid = str(uuid.uuid4()).replace("-", "") button_id = re.sub("\d+", "", button_uuid) custom_css = f""" """ dl_link = ( custom_css + f'{button_text}

' ) # dl_link = f'

' st.markdown(dl_link, unsafe_allow_html=True) def get_obsei_config(current_path, file_name): return ObseiConfiguration( config_path=current_path, config_filename=file_name, ).configuration @st.cache def get_icon_name(name, icon, icon_size=40, font_size=1): if not name: return f'' return ( f'

' f'' f"{name}

" ) def render_config(config, component, help_str=None, parent_key=None): if config is None: return prefix = "" if parent_key is None else f"{parent_key}." if help_str is not None: with component.expander("Info", False): help_area = "\n".join(help_str) st.code(f"{help_area}") for k, v in config.items(): if k == "_target_": continue if isinstance(v, dict): render_config(v, component, None, k) elif isinstance(v, list): if len(v) == 0: continue is_object = isinstance(v[0], dict) if is_object: for idx, sub_element in enumerate(v): render_config(sub_element, component, None, f"{k}[{idx}]") else: text_data = component.text_area( f"{prefix}{k}", ", ".join(v), help="Comma separated list" ) text_list = text_data.split(",") config[k] = [text.strip() for text in text_list] elif isinstance(v, bool): options = [True, False] selected_option = component.radio(f"{prefix}{k}", options, options.index(v)) config[k] = bool(selected_option) else: tokens = k.split("_") is_secret = tokens[-1] in ["key", "password", "token", "secret"] hint = ( "Enter value" if "lookup" not in tokens else "Format: `` d=day, h=hour & m=minute" ) config[k] = component.text_input( f"{prefix}{k}", v, type="password" if is_secret else "default", help=hint, ) def generate_python(generate_config): return f""" from obsei.configuration import ObseiConfiguration # This is Obsei workflow path and filename config_path = "./" config_filename = "workflow.yml" # Extract config via yaml file using `config_path` and `config_filename` obsei_configuration = ObseiConfiguration(config_path=config_path, config_filename=config_filename) # Initialize objects using configuration source_config = obsei_configuration.initialize_instance("source_config") source = obsei_configuration.initialize_instance("source") analyzer = obsei_configuration.initialize_instance("analyzer") analyzer_config = obsei_configuration.initialize_instance("analyzer_config") sink_config = obsei_configuration.initialize_instance("sink_config") sink = obsei_configuration.initialize_instance("sink") # This will fetch information from configured source ie twitter, app store etc source_response_list = source.lookup(source_config) # This will execute analyzer (Sentiment, classification etc) on source data with provided analyzer_config # Analyzer will it's output to `segmented_data` inside `analyzer_response` analyzer_response_list = analyzer.analyze_input( source_response_list=source_response_list, analyzer_config=analyzer_config ) # This will send analyzed output to configure sink ie Slack, Zendesk etc sink_response_list = sink.send_data(analyzer_response_list, sink_config) """ def generate_yaml(generate_config): return yaml.dump(generate_config) def execute_workflow(generate_config, component=None, log_components=None): progress_show = None if component: progress_show = component.empty() progress_show.code("πŸ„πŸ„πŸ„ Processing 🐒🐒🐒") try: obsei_configuration = ObseiConfiguration(configuration=generate_config) source_config = obsei_configuration.initialize_instance("source_config") source = obsei_configuration.initialize_instance("source") analyzer = obsei_configuration.initialize_instance("analyzer") analyzer_config = obsei_configuration.initialize_instance("analyzer_config") sink_config = obsei_configuration.initialize_instance("sink_config") sink = obsei_configuration.initialize_instance("sink") source_response_list = source.lookup(source_config) log_components["source"].write([vars(response) for response in source_response_list]) analyzer_response_list = analyzer.analyze_input( source_response_list=source_response_list, analyzer_config=analyzer_config ) log_components["analyzer"].write([vars(response) for response in analyzer_response_list]) sink_response_list = sink.send_data(analyzer_response_list, sink_config) if sink.TYPE == 'Pandas': log_components["sink"].write(sink_response_list) elif sink_response_list is not None: log_components["sink"].write([vars(response) for response in sink_response_list]) else: log_components["sink"].write("No Data") if progress_show: progress_show.code("πŸŽ‰πŸŽ‰πŸŽ‰ Processing Complete!! 🍾🍾🍾") except Exception as ex: if progress_show: progress_show.code(f"❗❗❗ Processing Failed!! 😞😞😞 \n πŸ‘‰ ({str(ex)})") raise ex