Spaces:
Running
Running
| # import shutil | |
| import os | |
| import select | |
| import subprocess | |
| import sys | |
| import time | |
| from datetime import datetime, timedelta, timezone | |
| from pathlib import Path | |
| from typing import * | |
| import streamlit as st | |
| sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) | |
| from varco_arena.varco_arena_core.prompts import load_prompt | |
| from view_utils import ( | |
| default_page_setting, | |
| escape_markdown, | |
| set_nav_bar, | |
| show_linebreak_in_md, | |
| ) | |
| VA_ROOT = Path(os.environ.get("VARCO_ARENA_RESULT_PATH", "./user_submit")) | |
| USR_SUB = VA_ROOT.parts[-1] | |
| import shutil | |
| import pandas as pd | |
| import analysis_utils as au | |
| from view_utils import visualization | |
| class DataCache: | |
| def __init__(self): | |
| self.cache = {} | |
| def store(self, key: str, data: dict): | |
| self.cache[key] = data | |
| def get(self, key: str) -> Optional[dict]: | |
| return self.cache.get(key) | |
| # Initialize the cache in session state if it doesn't exist | |
| if "data_cache" not in st.session_state: | |
| st.session_state.data_cache = DataCache() | |
| def purge_user_sub_data(data_path_to_purge: Union[Path, str] = None): | |
| if data_path_to_purge is None: | |
| print("nothing to purge") | |
| return | |
| else: | |
| shutil.rmtree(data_path_to_purge) | |
| print(f"purged {str(data_path_to_purge)}") | |
| return | |
| def load_and_cache_data(result_file_path: Optional[str] = None) -> Tuple[Dict, Dict]: | |
| """ | |
| Load data from file, cache it in memory, then remove the file. | |
| Returns cached data on subsequent calls. | |
| Args: | |
| result_file_path: Path to the result JSON file | |
| Returns: | |
| Tuple of (all_result_dict, df_dict) | |
| """ | |
| # Check if we already have cached data for this path | |
| if result_file_path: | |
| cache_key = str(Path(result_file_path)) | |
| cached_data = st.session_state.data_cache.get(cache_key) | |
| if cached_data: | |
| return cached_data["all_result_dict"], cached_data["df_dict"] | |
| # Initialize empty dicts | |
| all_result_dict = {} | |
| df_dict = {} | |
| if result_file_path is not None: | |
| try: | |
| result_file_path = Path(result_file_path) | |
| # Read and process data | |
| df = pd.read_json(result_file_path) | |
| for col in ["tstamp", "logs"]: | |
| if col in df.columns: | |
| df.drop(columns=[col], inplace=True) | |
| df = au.index_test_scenario(df) | |
| fig_dict_per_task = {} | |
| df_dict_per_task = {} | |
| # Process overall data | |
| fig_dict_per_task["Overall"] = visualization(df, is_overall=True) | |
| df_dict_per_task["Overall"] = df | |
| # Process per-task data | |
| for task in df["task"].unique(): | |
| df_task = df[df["task"] == task] | |
| fig_dict_per_task[task] = visualization(df_task, is_overall=False) | |
| df_dict_per_task[task] = df_task | |
| # Create key from path components | |
| prm_name = result_file_path.parts[-2] | |
| exp_name = result_file_path.parts[-3] | |
| key = f"{exp_name}/{prm_name}" | |
| all_result_dict[key] = fig_dict_per_task | |
| df_dict[key] = df_dict_per_task | |
| # Store in cache before removing file | |
| cache_data = {"all_result_dict": all_result_dict, "df_dict": df_dict} | |
| st.session_state.data_cache.store(str(result_file_path), cache_data) | |
| # Remove user experiment directory | |
| purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
| except Exception as e: | |
| st.error(f"Error processing data: {str(e)}") | |
| return {}, {} | |
| return all_result_dict, df_dict | |
| def upload_files(uploaded_files) -> Path: | |
| # prep directory for user submission | |
| user_sub_root = VA_ROOT | |
| if user_sub_root.exists(): | |
| if not user_sub_root.is_dir(): | |
| raise ValueError( | |
| f"{user_sub_root} file exists and is not a directory. Consider renaming it." | |
| ) | |
| else: | |
| user_sub_root.mkdir(parents=True) | |
| KST = timezone(timedelta(hours=9)) | |
| tstamp = datetime.now(KST) | |
| tstr = tstamp.strftime("%m-%d_%H:%M:%S") | |
| files_dir_str = "./" + str(user_sub_root / tstr) | |
| files_dir = Path(files_dir_str) | |
| files_dir.mkdir(parents=True, exist_ok=True) | |
| uploaded_files = list(uploaded_files) | |
| if not uploaded_files: | |
| st.warning("β No files to upload. Please drag/drop or browse files to upload.") | |
| # purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
| elif len(uploaded_files) < 2: | |
| st.error("β You need at least 2 jsonlines files to properly run VA.") | |
| purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
| else: # properly uploaded | |
| for file in uploaded_files: | |
| # Create a path for the file in the server directory | |
| file_path = files_dir / file.name | |
| # Save the file to the server directory | |
| with open(file_path, "wb") as f: | |
| f.write(file.getbuffer()) | |
| jslfiles = list(files_dir.glob("*.jsonl")) | |
| st.success(f"β Successfully uploaded {len(jslfiles)} jsonl files.") | |
| return files_dir.resolve() | |
| def run_varco_arena( | |
| price_estimation: bool = False, | |
| # upload_dir: Union[str, Path] = None, | |
| promptname: str = None, | |
| exp_name: str = None, | |
| api_key: Optional[str] = None, | |
| evaluation_model: str = "gpt-4o-mini", | |
| update_interval: float = 1.0, | |
| ): | |
| # Use environment variable for API key | |
| ptn = f"{str(st.session_state.upfiles_dir)}" | |
| outdir = Path(ptn) | |
| if exp_name: | |
| outdir = outdir / exp_name | |
| command = f"python varco_arena/main.py -i {ptn} -o {outdir} -k {api_key} -p {promptname} -e {evaluation_model} -j 64" | |
| if price_estimation: | |
| command = f"{command} -c" | |
| else: | |
| command = command.replace("python", "yes | python ") | |
| print(command) | |
| api_key = None # clear immediately | |
| process = subprocess.Popen( | |
| command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| stdin=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, | |
| shell=True, | |
| ) | |
| # Set stdout and stdin to non-blocking mode | |
| os.set_blocking(process.stdout.fileno(), False) | |
| last_update_time = time.time() | |
| terminal_output = st.empty() | |
| full_output = f"{command}\n" | |
| to_show = full_output | |
| while True: | |
| # Check if we have output to read | |
| if select.select([process.stdout], [], [], 0)[0]: | |
| output = process.stdout.readline() | |
| if output: | |
| full_output += output | |
| if price_estimation: | |
| to_show = full_output | |
| terminal_output.code(to_show, language="bash") | |
| else: | |
| current_time = time.time() | |
| if current_time - last_update_time > update_interval: | |
| lines = full_output.split("\n") | |
| if len(lines) < 5: | |
| to_show = full_output | |
| else: | |
| to_show = "\n".join(["...\n..\n.\n"] + lines[-5:]) | |
| terminal_output.code(to_show, language="bash") | |
| last_update_time = current_time | |
| print(output) | |
| time.sleep(0.1) | |
| # Check if the process has finished | |
| if process.poll() is not None: | |
| # Read any remaining output | |
| os.set_blocking(process.stdout.fileno(), True) | |
| remaining_output = process.stdout.read() | |
| if remaining_output: | |
| lines = remaining_output.split("\n") | |
| if len(lines) > 10: | |
| to_show += "\n".join(["\n...\n..\n.\n"] + lines[-10:]) | |
| else: | |
| to_show += remaining_output | |
| terminal_output.code(to_show, language="bash") | |
| print(remaining_output) | |
| break | |
| return_code = process.poll() | |
| return outdir, return_code | |
| def main(): | |
| # init lang | |
| st.session_state["korean"] = st.session_state.get("korean", False) | |
| sidebar_placeholder = default_page_setting(layout="wide") | |
| set_nav_bar( | |
| False, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_init" | |
| ) | |
| st.title("βοΈ Arena-Lite βοΈ") | |
| if st.session_state.korean: | |
| st.write( | |
| """**Arena-Liteλ ν μ€νΈμ λͺ λ Ήμ΄λ³λ‘ λΉκ΅ν λͺ¨λΈ(μμ±λ¬Έ)μ ν λλ¨ΌνΈλ₯Ό μννκ³ κ²°κ³Όλ€μ μ’ ν©νμ¬ λͺ¨λΈλ€μ μμλ₯Ό λ§€κΈ°λ λ²€μΉλ§νΉ μμ€ν μ λλ€. μ΄κ²μ reference μμνκ³Ό λΉκ΅νμ¬ μΉλ₯ μ λ§€κΈ°λ λ°©λ²λ³΄λ€ μ ννλ©° λ μ λ ΄ν©λλ€.** | |
| λͺ¨λ²λ΅μμ νμλ‘ νμ§ μμΌλ―λ‘ μ»€μ€ν ν μ€νΈμ (50+ ν) μ νμ©νλ κ²½μ° νΈλ¦¬ν λ²€μΉλ§νΉμ΄ κ°λ₯ν©λλ€.""" | |
| ) | |
| else: | |
| st.write( | |
| """**Arena-Lite is an LLM benchmarking system that compares model responses across customized test scenarios (recommend >50 prompts) without requiring reference answers.** | |
| Arena-Lite conducts tournaments between models to be compared for each test set command, ranking models accurately at an affordable price. This is more accurate and cost-effective than rating win rates by comparing against reference outputs.""" | |
| ) | |
| st.divider() | |
| # Set up the file uploader | |
| if st.session_state.korean: | |
| st.markdown("### 1. λͺ¨λΈ μΆλ ₯νμΌ μ λ‘λ") | |
| else: | |
| st.markdown("### 1. Upload LLM responses") | |
| uploaded_files = st.file_uploader( | |
| "Drag and Drop jsonlines files (.jsonl)", accept_multiple_files=True | |
| ) | |
| if st.session_state.korean: | |
| st.info("μ λ‘λ νμ νμΌμ μλμΌλ‘ μμ λλ©° μμ§λκ±°λ μ¬μ©λμ§ μμ΅λλ€.\n- [μ λ ₯ μμ νμΌ (*.jsonl)](https://huggingface.co/spaces/NCSOFT/VARCO_Arena/tree/main/varco_arena/rsc/inputs_for_dbg/dbg_llmbar_brief_inputs)") | |
| else: | |
| st.info( | |
| "Your uploads will be removed automatically, not being collected nor reused for any purpose.\n- [Example input files (*.jsonl)](https://huggingface.co/spaces/NCSOFT/VARCO_Arena/tree/main/varco_arena/rsc/inputs_for_dbg/dbg_llmbar_brief_inputs)" | |
| ) | |
| # upload state | |
| if "upfiles_dir" not in st.session_state: | |
| st.session_state.upfiles_dir = None | |
| if st.button("μ λ‘λνκΈ°" if st.session_state.korean else "Upload Files"): | |
| st.session_state.upfiles_dir = upload_files(uploaded_files) | |
| if st.button("μ λ‘λν νμΌ μ§μ°κΈ°" if st.session_state.korean else "Purge my uploads"): | |
| st.session_state.upfiles_dir = None | |
| if VA_ROOT.is_dir(): | |
| shutil.rmtree(VA_ROOT) | |
| st.success( | |
| "β μ λ‘λν νμΌμ μλ²μμ μ§μ μ΅λλ€" | |
| if st.session_state.korean | |
| else "β Removed your uploads from the server successfully" | |
| ) | |
| else: | |
| st.error( | |
| "β μ§μΈ νμΌμ΄ μμ΅λλ€" | |
| if st.session_state.korean | |
| else "β You have nothing uploaded" | |
| ) | |
| if st.session_state.korean: | |
| with st.expander("ββ 무μμ μ λ‘λ νλμββ"): | |
| st.info(open("guide_mds/input_jsonls_kr.md", encoding="UTF8").read()) | |
| else: | |
| with st.expander("ββ What should I upload ββ"): | |
| st.info(open("guide_mds/input_jsonls_en.md", encoding="UTF8").read()) | |
| # Form for cost estimation | |
| with st.form("cost_estimation_form"): | |
| if st.session_state.korean: | |
| st.write("### 2. κ°κ²© μ°μ ") | |
| else: | |
| st.write("### 2. Cost Estimation") | |
| eval_model = st.selectbox( | |
| "Select Judge", | |
| open("eval_models_list.txt", encoding="UTF8").read().split("\n"), | |
| ) | |
| promptname = st.selectbox( | |
| "Select Evalutaion Prompt", | |
| open("eval_prompt_list.txt", encoding="UTF8").read().split("\n"), | |
| ) | |
| if st.session_state.korean: | |
| st.markdown("*`llmbar`μΈ λ€λ₯Έ ν둬ννΈλ μΆ©λΆν κ²μ¦λ ν둬ννΈλ μλλλ€. (λμμ ν¨)") | |
| else: | |
| st.markdown( | |
| "*Eval prompts other than `llmbar` is working example, not the optimal ones." | |
| ) | |
| if promptname == USR_SUB: | |
| raise ValueError( | |
| f"{USR_SUB=} is preserved name for the system. Consider another naming for the prompt or consider changing {VA_ROOT=} (USR_SUB == VA_ROOT.parts[-1])." | |
| ) | |
| estimate_button = st.form_submit_button("Calculate Cost!") | |
| with st.expander( | |
| "LLM Judgeμ νμ©λλ ν둬ννΈ (`Calculate Cost!` ν΄λ¦μ κ°±μ )" | |
| if st.session_state.korean | |
| else "**Evaluation Prompt for LLM Judge (will refresh after `Calculate Cost!` clicked)**" | |
| ): | |
| prompt = load_prompt(promptname, task="-") | |
| kwargs = dict( | |
| inst="{inst}", | |
| src="{src}", | |
| out_a="{out_a}", | |
| out_b="{out_b}", | |
| task="-", | |
| ) | |
| if promptname == "translation_pair": | |
| kwargs["source_lang"] = "{source_lang}" | |
| kwargs["target_lang"] = "{target_lang}" | |
| prompt_cmpl = prompt.complete_prompt(**kwargs) | |
| st.markdown(f"### Evaluation Prompt: {promptname}") | |
| for msg in prompt_cmpl: | |
| st.markdown(f"**{msg['role']}**") | |
| st.info(show_linebreak_in_md(escape_markdown(msg["content"]))) | |
| if estimate_button: | |
| if st.session_state.get("upfiles_dir") is None: | |
| st.error( | |
| "β Requirements: You have to upload jsonlines files first to proceed" | |
| ) | |
| else: | |
| st.markdown("##### Estimated Cost") | |
| dummy_api_key = "dummy" | |
| dummy_exp_name = "dummy" | |
| result_file_path, return_code = run_varco_arena( | |
| # upload_dir=st.session_state.upfiles_dir, | |
| promptname=promptname, | |
| api_key=dummy_api_key, | |
| exp_name=dummy_exp_name, | |
| price_estimation=True, | |
| evaluation_model=eval_model, | |
| ) | |
| if return_code: | |
| st.error( | |
| "β RuntimeError: An error occurred during cost estimation. **Restart from file upload!**" | |
| ) | |
| purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
| else: | |
| st.success("β Cost estimation completed successfully") | |
| st.session_state.cost_estimated = True | |
| # Form for actual run | |
| with st.form("run_arena_form"): | |
| if st.session_state.korean: | |
| st.write("### 3. Arena-Lite ꡬλνκΈ°") | |
| else: | |
| st.write("### 3. Run Arena-Lite") | |
| api_key = st.text_input("Enter your OpenAI API Key", type="password") | |
| # demo exp name fixated | |
| KST = timezone(timedelta(hours=9)) | |
| tstamp = datetime.now(KST) | |
| tstr = tstamp.strftime("%m-%d_%H:%M:%S") | |
| exp_name = f"{tstr}_KST_submit" | |
| if st.session_state.korean: | |
| st.write("**μ£Όμ**:`Ctrl+C` λ²νΌμ ꡬνλμ§ μμμ΅λλ€. ꡬλ μ μκ³ ν΄μ£ΌμΈμ.") | |
| else: | |
| st.write("**Caution: `Ctrl+C` button hasn't been implemented.**") | |
| run_button = st.form_submit_button( | |
| "π₯ Run Arena!", | |
| disabled=(not st.session_state.get("cost_estimated", False)) | |
| or "result_file_path" | |
| in st.session_state.keys(), # run already performed once | |
| ) | |
| if run_button: | |
| set_nav_bar( | |
| True, | |
| sidebar_placeholder=sidebar_placeholder, | |
| toggle_hashstr="app_during_run", | |
| ) | |
| if st.session_state.get("upfiles_dir") is None: | |
| st.error( | |
| "β Requirements: You have to upload jsonlines files first to proceed" | |
| ) | |
| elif not api_key: | |
| st.error("β Requirements: OpenAI key required to run VA.") | |
| else: | |
| result_file_path, return_code = run_varco_arena( | |
| # upload_dir=st.session_state.upfiles_dir, | |
| promptname=promptname, | |
| api_key=api_key, | |
| exp_name=exp_name, | |
| price_estimation=False, | |
| evaluation_model=eval_model, | |
| ) | |
| if return_code: | |
| st.error( | |
| "β RuntimeError: An error occurred during Arena-Lite run. Check the file and **restart from file upload!**" | |
| ) | |
| purge_user_sub_data(data_path_to_purge=VA_ROOT) | |
| else: | |
| st.success("β Arena-Lite run completed successfully") | |
| st.session_state.result_file_path = list( | |
| result_file_path.glob("**/result.json") | |
| )[-1] | |
| set_nav_bar( | |
| False, sidebar_placeholder=sidebar_placeholder, toggle_hashstr="app_run_done" | |
| ) | |
| if st.session_state.get("result_file_path", None) is not None: | |
| print(f"{st.session_state.get('result_file_path', None)=}") | |
| load_and_cache_data(result_file_path=str(st.session_state.result_file_path)) | |
| if __name__ == "__main__": | |
| main() | |