import os import time import requests import json import cv2 import copy import socket import logging import coloredlogs import numpy as np from pathlib import Path from random import randrange from colorama import Fore, Back, Style import streamlit as st from streamlit_option_menu import option_menu from src.styles.menu_styles import FOOTER_STYLES, HEADER_STYLES from src.utils.lang import en, vi, jpn, chn, kr from src.utils.footer import show_info from src.utils.helpers import get_random_img, get_files_in_dir, process_uploaded_file, \ image_to_base64, get_last_infos, get_last_options, check_options_changed from socket_client import sio as client from socket_client import END_OF_SENTENCE coloredlogs.install(level="INFO", fmt="%(asctime)s %(name)s %(funcName)s() %(filename)s %(lineno)d %(levelname)-8s %(message)s") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def call_tagging_api(img64, lang, force_apply_category): url = "https://api.runpod.ai/v2/i0uiwbhb3vy4ad/runsync" payload = json.dumps({ "input": { "lang": lang, "force_apply_category": force_apply_category, "img64": img64 } }) headers = { 'Authorization': 'A17AT3LFUCCJD117TXTXHFN33FNYS3AR5AA8VT3W', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) return response # --- PATH SETTINGS --- current_dir: Path = Path(__file__).parent if "__file__" in locals() else Path.cwd() css_file: Path = current_dir / "src/styles/.css" # --- API ADDRESS --- # api_address = "http://127.0.0.1:5050" username = "SalftCoffee1" # --- GENERAL SETTINGS --- PAGE_TITLE: str = "HangerAI Description & Advertisement" PAGE_ICON: str = "🛍️" LANG_EN: str = "En" LANG_VI: str = "Vi" LANG_JPN: str = "Jpn" LANG_CHN: str = "Chn" LANG_KR: str = "Kr" FORMAT_MAPPING = { "jpg": "JPEG", "JPG": "JPEG", "jpeg": "JPEG", "JPEG": "JPEG", "png": "PNG", "PNG": "PNG" } st.set_page_config(page_title=PAGE_TITLE, page_icon=PAGE_ICON, layout="wide") # --- LOAD CSS --- with open(css_file) as f: st.markdown(f"", unsafe_allow_html=True) selected_lang = option_menu( menu_title=None, options=[LANG_EN, LANG_VI, LANG_JPN, LANG_CHN, LANG_KR], icons=["globe2", "translate"], menu_icon="cast", default_index=0, orientation="horizontal", styles=HEADER_STYLES ) # Storing The Context if "locale" not in st.session_state: st.session_state.locale = en if "seed" not in st.session_state: st.session_state.seed = randrange(10**3) def get_radio_names(infos): button_names = [] for i, obj in enumerate(infos): name = obj["item category" if st.session_state.locale.lang_code=="en" else "loại sản phẩm"] button_names.append(f"{i + 1}. {name}") return button_names def mapping_cate_infos(info, mapper): new_info = {} for k, vs in info.items(): if isinstance(vs, list): vs = [mapper[v] for v in vs] elif isinstance(vs, dict): vs = mapping_cate_infos(vs, mapper) else: vs = mapper[vs] new_info[mapper[k]] = vs return new_info def main(): # api_address = "http://127.0.0.1:5050" api_address = st.sidebar.text_input( "api address" ) if api_address.strip() != "": client.connect(api_address, headers={"X-Username": username}) reset_env_dev = st.sidebar.button("Reset dev prompts") # Reset env dev prompts if reset_env_dev: logger.info("Reseting prompts from dev environment...") client.call("reset_dev") logger.info("Reset prompts from dev environment successfully!") st.sidebar.write("Options") st.sidebar.write("------------------------------------") # Environment option env_option = st.sidebar.selectbox( st.session_state.locale.environ_label, ("dev", "prod") ) st.sidebar.write("\n") # Task option task_option = st.sidebar.selectbox( st.session_state.locale.task_selection_label, ( "seo_article", "seo_article_outline", "tiktok_caption", "youtube_caption", "instagram_ad", "linkedin_ad", "advertising_campaign_idea", "gen_title_from_article", "rewrite", "translate", "extend_article", "reduce_article", "key_informations", "facebook_ad_long_form_outline", "blog_ad_outline", "description", "blog_ad", "facebook_ad_long_form", "facebook_ad_short_form" ) ) st.sidebar.write("\n") # Get task holder with open("src/utils/task_holders.json", "r") as f: task_holders = json.load(f) task_holder = task_holders[task_option] logger.info(f"Task holder: {task_holder}") if "owner_name" in task_holder: owner_name = st.sidebar.text_input( st.session_state.locale.owner_label ) else: owner_name = "" if "item_name" in task_holder: item_name = st.sidebar.text_input( st.session_state.locale.item_name_label ) else: item_name = "" if "brand_name" in task_holder: brand_name = st.sidebar.text_input( st.session_state.locale.brand_label ) else: brand_name = "" if "promotion_info" in task_holder: promotion_info = st.sidebar.text_area( st.session_state.locale.promotion_label ) else: promotion_info = "" if "more_details_info" in task_holder: more_details_info = st.sidebar.text_area( st.session_state.locale.more_details_label ) else: more_details_info = "" if "article_content" in task_holder: article_content = st.sidebar.text_area( st.session_state.locale.article_content ) else: article_content = "" st.sidebar.write("\n") # Writing tone option if "tone_option" in task_holder: tone_option = st.sidebar.selectbox( st.session_state.locale.tone_selection_label, ("academic", "casual", "formal", "funny", "luxury") ) else: tone_option = "" st.sidebar.write("\n") # Load history options last_options = get_last_options() # Setup options process_options = { "env": env_option, "lang": st.session_state.locale.lang_code, "task": task_option, "tone": tone_option, "more_options": { "owner": owner_name, "item_name": item_name, "brand": brand_name, "promotion": promotion_info, "more_details": more_details_info }, "article_content": article_content } if "need_image_upload" in task_holder: col1, col2 = st.columns([3, 7]) infos = None with col1: st.file_uploader(st.session_state.locale.upload_label, key="uploaded_file") uploaded_file = st.session_state.uploaded_file infos = {} if uploaded_file is not None: filename = uploaded_file.name suf_format = FORMAT_MAPPING[filename.split(".")[-1]] # Get info from cache result = get_last_infos() image_pil = process_uploaded_file(uploaded_file) image_np = np.array(image_pil) # Check if name different from the last item if filename != result.get("name", ""): img64 = image_to_base64(image_pil, suf_format) start = time.time() response = call_tagging_api( img64=img64, lang="en", ##TODO: Currently support `en`` and `vi` force_apply_category=None ) assert response.status_code == 200, response.status_code result = response.json() end = time.time() logger.info(f" - Tag Extract Time {end - start:.4f}") # Save to cache_infos result["name"] = filename logger.info(f"Item infos: {result}") with open("cache_infos.json", "w") as f: json.dump(result, f, indent=4) else: logger.info("Load last infos of the image") infos = result["output"]["infos"] st.image(image_np, use_column_width=True) c1, c2, c3 = st.columns(3) select_generate = c2.button("Start generate") # Show attributes for k, v in result["output"]["infos"].items(): st.write(f"**{k}**: {v}") # Setup options options = copy.deepcopy(process_options) if uploaded_file is not None: options["cached_infos"] = result # Check options changed options_changed = check_options_changed(last_options, options) split_line = "
" st.markdown(split_line, unsafe_allow_html=True) with col2: # Visualize description st.write("**Description:**") st.write() st.write(result["output"]["description"]) st.write() st.write("-"*10) st.write() if len(infos): target_info = copy.deepcopy(infos) # add more informations if not in description task if task_option != "description": # Mapping category name to eng, vi version with open(st.session_state.locale.mapping_target_file, "r") as f: mapper = json.load(f) # Add other informations to target info if given for k, v in mapper.items(): if options["more_options"][k].strip() != "": target_info[v] = options["more_options"][k].strip() process_options["item_info"] = target_info logging.info(f"process options: {process_options}") if select_generate: if not options_changed: logger.info("use last options content") content = options["content"] st.write(content) else: start = time.time() # Run process in background client.call("start_process", process_options) complete_text = "" # trigger_start = False while True: from socket_client import chunk_list if len(chunk_list): next_text = chunk_list.pop(0) if next_text == END_OF_SENTENCE: break complete_text = complete_text + " " + next_text # st.write() # st.write("-"*10) # st.write() # st.write(complete_text) st.write() st.write("-"*10) st.write() st.write(complete_text) end = time.time() logger.info(f" - Content generation Time {end - start:.4f}") # Save opions after generate options["content"] = complete_text with open("cache_options.json", "w") as f: json.dump(options, f, indent=4) else: # if "need_image_upload" not in task_holder c1, c2, c3 = st.columns(3) select_generate = c2.button("Start generate") process_options["item_info"] = "" # Do not need in here if select_generate: start = time.time() # Run process in background client.call("start_process", process_options) complete_text = "" # trigger_start = False while True: from socket_client import chunk_list if len(chunk_list): next_text = chunk_list.pop(0) if next_text == END_OF_SENTENCE: break complete_text = complete_text + " " + next_text # st.write() # st.write("-"*10) # st.write() # st.write(complete_text) st.write() st.write("-"*10) st.write() st.write(complete_text) end = time.time() logger.info(f" - Content generation Time {end - start:.4f}") client.disconnect() def run_agi(): match selected_lang: case "En": st.session_state.locale = en case "Vi": st.session_state.locale = vi case "Jpn": st.session_state.locale = jpn case "Chn": st.session_state.locale = chn case "Kr": st.session_state.locale = kr case _: st.session_state.locale = en st.markdown(f"

{st.session_state.locale.title}

", unsafe_allow_html=True) selected_footer = option_menu( menu_title=None, options=[ st.session_state.locale.footer_option1, st.session_state.locale.footer_option0, ], icons=["info-circle", "chat-square-text"], # https://icons.getbootstrap.com/ menu_icon="cast", default_index=0, orientation="horizontal", styles=FOOTER_STYLES ) match selected_footer: case st.session_state.locale.footer_option0: main() case st.session_state.locale.footer_option1: show_info() case _: show_info() if __name__ == "__main__": run_agi()