Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -1,173 +1,173 @@
|
|
1 |
-
import os
|
2 |
-
import json
|
3 |
-
import random
|
4 |
-
import uuid
|
5 |
-
from flask import Flask, request, jsonify, session, render_template
|
6 |
-
from flask_cors import CORS
|
7 |
-
from datetime import datetime
|
8 |
-
from elo_rank import EloRank
|
9 |
-
|
10 |
-
app = Flask(__name__)
|
11 |
-
CORS(app)
|
12 |
-
app.secret_key = 'supersecretkey'
|
13 |
-
|
14 |
-
DATA_DIR = './data'
|
15 |
-
RESULTS_DIR = './results'
|
16 |
-
|
17 |
-
# 实例化 EloRank 系统
|
18 |
-
elo_rank_system = EloRank()
|
19 |
-
|
20 |
-
# 初始化 Elo 排名的模型
|
21 |
-
models = ['output_path_4o', 'output_path_miniomni', 'output_path_speechgpt', 'output_path_funaudio', 'output_path_4o_cascade', 'output_path_4o_llama_omni']
|
22 |
-
for model in models:
|
23 |
-
elo_rank_system.add_model(model)
|
24 |
-
|
25 |
-
|
26 |
-
def load_test_data(task):
|
27 |
-
"""Load the JSON file corresponding to the selected task"""
|
28 |
-
with open(os.path.join(DATA_DIR, f"{task}.json"), "r", encoding='utf-8') as f:
|
29 |
-
test_data = json.load(f)
|
30 |
-
|
31 |
-
# 更新音频路径,将它们指向 Flask 静态文件夹
|
32 |
-
for item in test_data:
|
33 |
-
item['input_path'] = f"/static/audio{item['input_path']}"
|
34 |
-
item['output_path_4o'] = f"/static/audio{item['output_path_4o']}"
|
35 |
-
item['output_path_miniomni'] = f"/static/audio{item['output_path_miniomni']}"
|
36 |
-
item['output_path_speechgpt'] = f"/static/audio{item['output_path_speechgpt']}"
|
37 |
-
item['output_path_funaudio'] = f"/static/audio{item['output_path_funaudio']}"
|
38 |
-
item['output_path_4o_cascade'] = f"/static/audio{item['output_path_4o_cascade']}"
|
39 |
-
item['output_path_4o_llama_omni'] = f"/static/audio{item['output_path_4o_llama_omni']}"
|
40 |
-
|
41 |
-
return test_data
|
42 |
-
|
43 |
-
|
44 |
-
def save_result(task, username, result_data, session_id):
|
45 |
-
"""Save user's result in a separate file"""
|
46 |
-
file_path = os.path.join(RESULTS_DIR, f"{task}_{username}_{session_id}.jsonl")
|
47 |
-
# 获取所有模型的 Elo 分数
|
48 |
-
elo_scores = {model: elo_rank_system.get_rating(model) for model in models}
|
49 |
-
|
50 |
-
# 添加 Elo 分数和时间戳到结果数据
|
51 |
-
result_data['elo_scores'] = elo_scores
|
52 |
-
result_data['timestamp'] = datetime.now().isoformat()
|
53 |
-
with open(file_path, "a", encoding='utf-8') as f:
|
54 |
-
f.write(json.dumps(result_data) + "\n")
|
55 |
-
|
56 |
-
@app.route('/start_test', methods=['POST'])
|
57 |
-
def start_test():
|
58 |
-
"""Initiate the test for a user with the selected task"""
|
59 |
-
data = request.json
|
60 |
-
task = data['task']
|
61 |
-
username = data['username']
|
62 |
-
|
63 |
-
# Load the test data
|
64 |
-
test_data = load_test_data(task)
|
65 |
-
|
66 |
-
# Shuffle test data for the user
|
67 |
-
random.shuffle(test_data)
|
68 |
-
|
69 |
-
# Generate a unique session ID (for example using uuid)
|
70 |
-
session_id = str(uuid.uuid4())
|
71 |
-
|
72 |
-
# Store in session
|
73 |
-
session['task'] = task
|
74 |
-
session['username'] = username
|
75 |
-
session['test_data'] = test_data
|
76 |
-
session['current_index'] = 0
|
77 |
-
session['session_id'] = session_id # Store the session ID in the session
|
78 |
-
|
79 |
-
task_description = test_data[0].get('task_description', '')
|
80 |
-
|
81 |
-
return jsonify({"message": "Test started", "total_tests": len(test_data), "task_description": task_description})
|
82 |
-
|
83 |
-
|
84 |
-
|
85 |
-
@app.route('/next_test', methods=['GET'])
|
86 |
-
def next_test():
|
87 |
-
"""Serve the next test item"""
|
88 |
-
if 'current_index' not in session or 'test_data' not in session:
|
89 |
-
return jsonify({"message": "No active test found"}), 400
|
90 |
-
|
91 |
-
current_index = session['current_index']
|
92 |
-
test_data = session['test_data']
|
93 |
-
|
94 |
-
if current_index >= len(test_data):
|
95 |
-
# Return the "Test completed" message when all tests are done
|
96 |
-
return jsonify({"message": "Test completed"}), 200
|
97 |
-
|
98 |
-
# 使用 EloRank 的 sample_next_match 来选择两款模型
|
99 |
-
selected_models = elo_rank_system.sample_next_match()
|
100 |
-
|
101 |
-
# Serve test data with the two selected models
|
102 |
-
current_test = test_data[current_index]
|
103 |
-
session['selected_models'] = selected_models
|
104 |
-
session['current_index'] += 1
|
105 |
-
|
106 |
-
return jsonify({
|
107 |
-
"text": current_test["text"],
|
108 |
-
"input_path": current_test["input_path"],
|
109 |
-
"model_a": selected_models[0],
|
110 |
-
"model_b": selected_models[1],
|
111 |
-
"audio_a": current_test[selected_models[0]],
|
112 |
-
"audio_b": current_test[selected_models[1]]
|
113 |
-
})
|
114 |
-
|
115 |
-
@app.route('/submit_result', methods=['POST'])
|
116 |
-
def submit_result():
|
117 |
-
"""Submit the user's result and save it"""
|
118 |
-
data = request.json
|
119 |
-
chosen_model = data['chosen_model']
|
120 |
-
|
121 |
-
username = session.get('username')
|
122 |
-
task = session.get('task')
|
123 |
-
current_index = session.get('current_index') - 1 # Subtract since we increment after serving
|
124 |
-
session_id = session.get('session_id') # Get the session ID
|
125 |
-
|
126 |
-
if not username or not task or current_index < 0:
|
127 |
-
return jsonify({"message": "No active test found"}), 400
|
128 |
-
|
129 |
-
# Retrieve the selected models
|
130 |
-
selected_models = session['selected_models']
|
131 |
-
model_a = selected_models[0]
|
132 |
-
model_b = selected_models[1]
|
133 |
-
|
134 |
-
result = {
|
135 |
-
"name": username,
|
136 |
-
"chosen_model": chosen_model,
|
137 |
-
"model_a": model_a,
|
138 |
-
"model_b": model_b,
|
139 |
-
"result": {
|
140 |
-
model_a: 1 if chosen_model == 'A' else 0,
|
141 |
-
model_b: 1 if chosen_model == 'B' else 0
|
142 |
-
}
|
143 |
-
}
|
144 |
-
|
145 |
-
# Save the result for the current test using session_id to avoid filename conflict
|
146 |
-
test_data = session['test_data'][current_index]
|
147 |
-
result_data = {**test_data, **result}
|
148 |
-
save_result(task, username, result_data, session_id)
|
149 |
-
|
150 |
-
# 更新 Elo 排名系统
|
151 |
-
if chosen_model == 'A':
|
152 |
-
elo_rank_system.record_match(model_a, model_b)
|
153 |
-
else:
|
154 |
-
elo_rank_system.record_match(model_b, model_a)
|
155 |
-
|
156 |
-
return jsonify({"message": "Result submitted", "model_a": model_a, "model_b": model_b, "chosen_model": chosen_model})
|
157 |
-
|
158 |
-
@app.route('/end_test', methods=['GET'])
|
159 |
-
def end_test():
|
160 |
-
"""End the test session"""
|
161 |
-
session.clear()
|
162 |
-
return jsonify({"message": "Test completed"})
|
163 |
-
|
164 |
-
# 渲染index.html页面
|
165 |
-
@app.route('/')
|
166 |
-
def index():
|
167 |
-
return render_template('index.html')
|
168 |
-
|
169 |
-
if __name__ == '__main__':
|
170 |
-
if not os.path.exists(RESULTS_DIR):
|
171 |
-
os.makedirs(RESULTS_DIR)
|
172 |
-
# 允许局域网访问
|
173 |
-
app.run(debug=True, host="0.0.0.0", port=
|
|
|
1 |
+
import os
|
2 |
+
import json
|
3 |
+
import random
|
4 |
+
import uuid
|
5 |
+
from flask import Flask, request, jsonify, session, render_template
|
6 |
+
from flask_cors import CORS
|
7 |
+
from datetime import datetime
|
8 |
+
from elo_rank import EloRank
|
9 |
+
|
10 |
+
app = Flask(__name__)
|
11 |
+
CORS(app)
|
12 |
+
app.secret_key = 'supersecretkey'
|
13 |
+
|
14 |
+
DATA_DIR = './data'
|
15 |
+
RESULTS_DIR = './results'
|
16 |
+
|
17 |
+
# 实例化 EloRank 系统
|
18 |
+
elo_rank_system = EloRank()
|
19 |
+
|
20 |
+
# 初始化 Elo 排名的模型
|
21 |
+
models = ['output_path_4o', 'output_path_miniomni', 'output_path_speechgpt', 'output_path_funaudio', 'output_path_4o_cascade', 'output_path_4o_llama_omni']
|
22 |
+
for model in models:
|
23 |
+
elo_rank_system.add_model(model)
|
24 |
+
|
25 |
+
|
26 |
+
def load_test_data(task):
|
27 |
+
"""Load the JSON file corresponding to the selected task"""
|
28 |
+
with open(os.path.join(DATA_DIR, f"{task}.json"), "r", encoding='utf-8') as f:
|
29 |
+
test_data = json.load(f)
|
30 |
+
|
31 |
+
# 更新音频路径,将它们指向 Flask 静态文件夹
|
32 |
+
for item in test_data:
|
33 |
+
item['input_path'] = f"/static/audio{item['input_path']}"
|
34 |
+
item['output_path_4o'] = f"/static/audio{item['output_path_4o']}"
|
35 |
+
item['output_path_miniomni'] = f"/static/audio{item['output_path_miniomni']}"
|
36 |
+
item['output_path_speechgpt'] = f"/static/audio{item['output_path_speechgpt']}"
|
37 |
+
item['output_path_funaudio'] = f"/static/audio{item['output_path_funaudio']}"
|
38 |
+
item['output_path_4o_cascade'] = f"/static/audio{item['output_path_4o_cascade']}"
|
39 |
+
item['output_path_4o_llama_omni'] = f"/static/audio{item['output_path_4o_llama_omni']}"
|
40 |
+
|
41 |
+
return test_data
|
42 |
+
|
43 |
+
|
44 |
+
def save_result(task, username, result_data, session_id):
|
45 |
+
"""Save user's result in a separate file"""
|
46 |
+
file_path = os.path.join(RESULTS_DIR, f"{task}_{username}_{session_id}.jsonl")
|
47 |
+
# 获取所有模型的 Elo 分数
|
48 |
+
elo_scores = {model: elo_rank_system.get_rating(model) for model in models}
|
49 |
+
|
50 |
+
# 添加 Elo 分数和时间戳到结果数据
|
51 |
+
result_data['elo_scores'] = elo_scores
|
52 |
+
result_data['timestamp'] = datetime.now().isoformat()
|
53 |
+
with open(file_path, "a", encoding='utf-8') as f:
|
54 |
+
f.write(json.dumps(result_data) + "\n")
|
55 |
+
|
56 |
+
@app.route('/start_test', methods=['POST'])
|
57 |
+
def start_test():
|
58 |
+
"""Initiate the test for a user with the selected task"""
|
59 |
+
data = request.json
|
60 |
+
task = data['task']
|
61 |
+
username = data['username']
|
62 |
+
|
63 |
+
# Load the test data
|
64 |
+
test_data = load_test_data(task)
|
65 |
+
|
66 |
+
# Shuffle test data for the user
|
67 |
+
random.shuffle(test_data)
|
68 |
+
|
69 |
+
# Generate a unique session ID (for example using uuid)
|
70 |
+
session_id = str(uuid.uuid4())
|
71 |
+
|
72 |
+
# Store in session
|
73 |
+
session['task'] = task
|
74 |
+
session['username'] = username
|
75 |
+
session['test_data'] = test_data
|
76 |
+
session['current_index'] = 0
|
77 |
+
session['session_id'] = session_id # Store the session ID in the session
|
78 |
+
|
79 |
+
task_description = test_data[0].get('task_description', '')
|
80 |
+
|
81 |
+
return jsonify({"message": "Test started", "total_tests": len(test_data), "task_description": task_description})
|
82 |
+
|
83 |
+
|
84 |
+
|
85 |
+
@app.route('/next_test', methods=['GET'])
|
86 |
+
def next_test():
|
87 |
+
"""Serve the next test item"""
|
88 |
+
if 'current_index' not in session or 'test_data' not in session:
|
89 |
+
return jsonify({"message": "No active test found"}), 400
|
90 |
+
|
91 |
+
current_index = session['current_index']
|
92 |
+
test_data = session['test_data']
|
93 |
+
|
94 |
+
if current_index >= len(test_data):
|
95 |
+
# Return the "Test completed" message when all tests are done
|
96 |
+
return jsonify({"message": "Test completed"}), 200
|
97 |
+
|
98 |
+
# 使用 EloRank 的 sample_next_match 来选择两款模型
|
99 |
+
selected_models = elo_rank_system.sample_next_match()
|
100 |
+
|
101 |
+
# Serve test data with the two selected models
|
102 |
+
current_test = test_data[current_index]
|
103 |
+
session['selected_models'] = selected_models
|
104 |
+
session['current_index'] += 1
|
105 |
+
|
106 |
+
return jsonify({
|
107 |
+
"text": current_test["text"],
|
108 |
+
"input_path": current_test["input_path"],
|
109 |
+
"model_a": selected_models[0],
|
110 |
+
"model_b": selected_models[1],
|
111 |
+
"audio_a": current_test[selected_models[0]],
|
112 |
+
"audio_b": current_test[selected_models[1]]
|
113 |
+
})
|
114 |
+
|
115 |
+
@app.route('/submit_result', methods=['POST'])
|
116 |
+
def submit_result():
|
117 |
+
"""Submit the user's result and save it"""
|
118 |
+
data = request.json
|
119 |
+
chosen_model = data['chosen_model']
|
120 |
+
|
121 |
+
username = session.get('username')
|
122 |
+
task = session.get('task')
|
123 |
+
current_index = session.get('current_index') - 1 # Subtract since we increment after serving
|
124 |
+
session_id = session.get('session_id') # Get the session ID
|
125 |
+
|
126 |
+
if not username or not task or current_index < 0:
|
127 |
+
return jsonify({"message": "No active test found"}), 400
|
128 |
+
|
129 |
+
# Retrieve the selected models
|
130 |
+
selected_models = session['selected_models']
|
131 |
+
model_a = selected_models[0]
|
132 |
+
model_b = selected_models[1]
|
133 |
+
|
134 |
+
result = {
|
135 |
+
"name": username,
|
136 |
+
"chosen_model": chosen_model,
|
137 |
+
"model_a": model_a,
|
138 |
+
"model_b": model_b,
|
139 |
+
"result": {
|
140 |
+
model_a: 1 if chosen_model == 'A' else 0,
|
141 |
+
model_b: 1 if chosen_model == 'B' else 0
|
142 |
+
}
|
143 |
+
}
|
144 |
+
|
145 |
+
# Save the result for the current test using session_id to avoid filename conflict
|
146 |
+
test_data = session['test_data'][current_index]
|
147 |
+
result_data = {**test_data, **result}
|
148 |
+
save_result(task, username, result_data, session_id)
|
149 |
+
|
150 |
+
# 更新 Elo 排名系统
|
151 |
+
if chosen_model == 'A':
|
152 |
+
elo_rank_system.record_match(model_a, model_b)
|
153 |
+
else:
|
154 |
+
elo_rank_system.record_match(model_b, model_a)
|
155 |
+
|
156 |
+
return jsonify({"message": "Result submitted", "model_a": model_a, "model_b": model_b, "chosen_model": chosen_model})
|
157 |
+
|
158 |
+
@app.route('/end_test', methods=['GET'])
|
159 |
+
def end_test():
|
160 |
+
"""End the test session"""
|
161 |
+
session.clear()
|
162 |
+
return jsonify({"message": "Test completed"})
|
163 |
+
|
164 |
+
# 渲染index.html页面
|
165 |
+
@app.route('/')
|
166 |
+
def index():
|
167 |
+
return render_template('index.html')
|
168 |
+
|
169 |
+
if __name__ == '__main__':
|
170 |
+
if not os.path.exists(RESULTS_DIR):
|
171 |
+
os.makedirs(RESULTS_DIR)
|
172 |
+
# 允许局域网访问
|
173 |
+
app.run(debug=True, host="0.0.0.0", port=8080)
|