import os import random import json import pandas as pd dimensions = ['Audience', 'Keyword', 'Format', 'Language', 'Length', 'Source'] def make_clickable_model(model_name, link): return f'{model_name}' def rerank(): for dimension in dimensions: with open(f"all_dimensions/{dimension}.jsonl", "r") as f: data = [json.loads(line) for line in f] data.sort(key=lambda x: (x["WISE"], x["SICR"]), reverse=True) # 排序完后按顺序重新赋值 Rank,这个Rank是从1开始的,且放在第一列 for i, d in enumerate(data): d["Rank"] = i + 1 with open(f"all_dimensions/{dimension}.jsonl", "w") as f: for d in data: # 重新构建字典,使 Rank 成为第一个键 ordered_d = {"Rank": d["Rank"]} ordered_d.update({k: v for k, v in d.items() if k != "Rank"}) f.write(json.dumps(ordered_d) + "\n") def generate_sample_data(): model_names = [] for i in range(10): model_names.append(f"Model_{i}") for dimension in dimensions: for model_name in model_names: data = { "Model": make_clickable_model(model_name, f"https://huggingface.co/"), "WISE": round(random.uniform(0, 1), 2), "SICR": round(random.uniform(0, 1), 2), "nDCG@10(Original)": round(random.uniform(0, 1), 2), "nDCG@10(Instructed)": round(random.uniform(0, 1), 2), "nDCG@10(Reversely Instructed)": round(random.uniform(0, 1), 2), "MRR@1(Original)": round(random.uniform(0, 1), 2), "MRR@1(Instructed)": round(random.uniform(0, 1), 2), "MRR@1(Reversely Instructed)": round(random.uniform(0, 1), 2), } with open(f"all_dimensions/{dimension}.jsonl", "a") as f: f.write(json.dumps(data) + "\n") def get_data(dimension): with open(f"all_dimensions/{dimension}.jsonl", "r") as f: data = [json.loads(line) for line in f] return pd.DataFrame(data) def get_submission_data(): if is_empty("temp"): return pd.DataFrame() data = [] with open("temp/Audience.jsonl", "r") as f: data.extend([json.loads(line) for line in f]) return pd.DataFrame(data) def submit(json_file): flag, message = check_json_file(json_file) if flag: with open(json_file, "r") as f: data = json.load(f) if data['in_huggingface_hub']: model_name = make_clickable_model(data["Model"], f"https://huggingface.co") else: if data["Model Link"]: model_name = make_clickable_model(data["Model"], data["Model Link"]) else: model_name = data["Model"] all_dimension_data = data["dimensions"] for dimension in dimensions: each_dimension_data = all_dimension_data[dimension] # 如果temp/{dimension}.jsonl文件不存在,则创建 if not os.path.exists(f"temp/{dimension}.jsonl"): with open(f"temp/{dimension}.jsonl", "w"): pass with open(f"temp/{dimension}.jsonl", "a") as f: f.write(json.dumps({ "Model": model_name, "WISE": each_dimension_data["WISE"], "SICR": each_dimension_data["SICR"], "nDCG@10(Original)": each_dimension_data["nDCG@10(Original)"], "nDCG@10(Instructed)": each_dimension_data["nDCG@10(Instructed)"], "nDCG@10(Reversely Instructed)": each_dimension_data["nDCG@10(Reversely Instructed)"], "MRR@1(Original)": each_dimension_data["MRR@1(Original)"], "MRR@1(Instructed)": each_dimension_data["MRR@1(Instructed)"], "MRR@1(Reversely Instructed)": each_dimension_data["MRR@1(Reversely Instructed)"] }) + "\n") return "Submission successful." else: return message def refresh(): if is_empty("temp"): return for dimension in dimensions: # 读取temp/{dimension}.jsonl文件 with open(f"temp/{dimension}.jsonl", "r") as f: data = [json.loads(line) for line in f] # 将其写入all_dimensions/{dimension}.jsonl文件 # 如果存在相同的模型,则覆盖 with open(f"all_dimensions/{dimension}.jsonl", "r") as f: all_data = [json.loads(line) for line in f] for d in data: for i, ad in enumerate(all_data): if ad["Model"] == d["Model"]: all_data[i] = d break else: all_data.append(d) with open(f"all_dimensions/{dimension}.jsonl", "w") as f: for d in all_data: f.write(json.dumps(d) + "\n") # 删除temp/{dimension}.jsonl文件 os.remove(f"temp/{dimension}.jsonl") rerank() def check_json_file(json_file): with open(json_file, "r") as f: try: data = json.load(f) except json.JSONDecodeError: return False, "JSON file is not valid JSON." # 检查Model是否已在temp文件夹中 submission_queue_df = get_submission_data() if any([data["Model"] in row["Model"] for _, row in submission_queue_df.iterrows()]): return False, "Model already in submission queue." # 检查dimensions键是否存在且是否存在对应的值('Audience', 'Keyword', 'Format', 'Language', 'Length', 'Source') if "dimensions" not in data: return False, "JSON file does not contain 'dimensions' key.", all_dimension_data = data["dimensions"] if not all([d in all_dimension_data for d in dimensions]): return False, "JSON file does not contain all dimensions.", # 检查每一个维度的数据是否符合要求( WISE, SICR, nDCG@10(Original), nDCG@10(Instructed), nDCG@10(Reversely Instructed), MRR@1(Original), MRR@1(Instructed), MRR@1(Reversely Instructed)) for d in dimensions: each_dimension_data = all_dimension_data[d] if not all(k in each_dimension_data for k in ["WISE", "SICR", "nDCG@10(Original)", "nDCG@10(Instructed)", "nDCG@10(Reversely Instructed)", "MRR@1(Original)", "MRR@1(Instructed)", "MRR@1(Reversely Instructed)"]): return False, f"Dimension '{d}' does not contain all required keys.", return True, "JSON file is valid." def is_empty(dir_path): # check if the directory contains jsonl files return not any([f.endswith(".jsonl") for f in os.listdir(dir_path)])