Spaces:
Sleeping
Sleeping
File size: 3,892 Bytes
2fc6c45 f1c3845 2fc6c45 f1c3845 2fc6c45 595b6db 2fc6c45 595b6db b51c467 2fc6c45 d8cbf27 2fc6c45 d8cbf27 2fc6c45 f1c3845 2fc6c45 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import requests
import time
from openpyxl import load_workbook
def read_excel(file_path):
# 读取Excel文件
wb = load_workbook('music_id.xlsx')
# 获取指定表
ws = wb['Results']
# 创建一个music_id到title的字典
music_dict = {}
# 读取数据,并填充字典,假设第一行为表头
for row in ws.iter_rows(min_row=2, values_only=True): # 从第二行开始,跳过表头
title = row[0] # 第一列为 title
music_id = row[1] # 第二列为 music_id
if music_id and title: # 确保数据有效
music_dict[music_id] = title
return music_dict
# 匹配search_score中的MusicId返回对应的Title
def match_title_from_json(excel_dict, json_data):
search_scores = json_data.get('search_score', [])
matched_titles = {music_id: excel_dict.get(music_id, '') for music_id in search_scores}
first_value = next(iter(matched_titles.values()))
return first_value
class EvaluationProcessor:
def __init__(self, api_key):
self.api_key = api_key
self.audio_url = 'https://test.aitanzou.com/web/api/submit_audio'
self.video_url = 'https://test.aitanzou.com/web/api/submit_video'
self.result_url = 'https://test.aitanzou.com/web/api/getEvaluationResult'
self.headers = {
'API-Key': self.api_key
}
self.excel_file_path = 'music_id.xlsx'
def submit_evaluation(self, file_path, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0, is_video=False):
url = self.video_url if is_video else self.audio_url
# 根据文件类型选择 MIME 类型
file_type = 'video/mp4' if is_video else 'audio/mp3'
file_name = "video_file.mp4" if is_video else "audio_file.mp3"
# 打开文件并指定文件类型
files = {
'file': (file_name, open(file_path, 'rb'), file_type)
}
data = {
'musicId': music_id,
'hand': hand,
'orderStart': order_start,
'orderEnd': order_end,
'repeatType': repeat_type
}
response = requests.post(url, headers=self.headers, files=files, data=data)
if response.status_code == 200:
data = response.json()
task_id = data['data']['taskId']
return task_id
else:
raise Exception(f'Error: {response.status_code}, {response.text}')
def get_evaluation_result(self, task_id):
params = {'taskId': task_id}
while True:
result_response = requests.get(self.result_url, headers=self.headers, params=params)
#print(result_response)
if result_response.status_code == 200:
result_data = result_response.json()
status = result_data['data']['status']
#print(result_data)
if status in ['pending', 'processing']:
print(f'Task is still {status}...')
time.sleep(2)
elif status == 'completed':
title_musicid_dict = read_excel(self.excel_file_path)
# 匹配并输出对应的Title
title = match_title_from_json(title_musicid_dict, result_data['data'])
return result_data['data'], title
else:
raise Exception(f'Task failed: {result_data["message"]}')
else:
raise Exception(f'Error: {result_response.status_code}, {result_response.text}')
def process_evaluation(self, file_path, music_id=None, hand=-1, order_start=0, order_end=-1, repeat_type=0, is_video=False):
task_id = self.submit_evaluation(file_path, music_id, hand, order_start, order_end, repeat_type, is_video)
return self.get_evaluation_result(task_id)
|