|
from toolbox import CatchException, report_execption, select_api_key, update_ui, write_results_to_file, get_conf |
|
from .crazy_utils import request_gpt_model_in_new_thread_with_ui_alive |
|
|
|
def split_audio_file(filename, split_duration=1000): |
|
""" |
|
根据给定的切割时长将音频文件切割成多个片段。 |
|
|
|
Args: |
|
filename (str): 需要被切割的音频文件名。 |
|
split_duration (int, optional): 每个切割音频片段的时长(以秒为单位)。默认值为1000。 |
|
|
|
Returns: |
|
filelist (list): 一个包含所有切割音频片段文件路径的列表。 |
|
|
|
""" |
|
from moviepy.editor import AudioFileClip |
|
import os |
|
os.makedirs('gpt_log/mp3/cut/', exist_ok=True) |
|
|
|
|
|
audio = AudioFileClip(filename) |
|
|
|
|
|
total_duration = audio.duration |
|
split_points = list(range(0, int(total_duration), split_duration)) |
|
split_points.append(int(total_duration)) |
|
filelist = [] |
|
|
|
|
|
for i in range(len(split_points) - 1): |
|
start_time = split_points[i] |
|
end_time = split_points[i + 1] |
|
split_audio = audio.subclip(start_time, end_time) |
|
split_audio.write_audiofile(f"gpt_log/mp3/cut/{filename[0]}_{i}.mp3") |
|
filelist.append(f"gpt_log/mp3/cut/{filename[0]}_{i}.mp3") |
|
|
|
audio.close() |
|
return filelist |
|
|
|
def AnalyAudio(parse_prompt, file_manifest, llm_kwargs, chatbot, history): |
|
import os, requests |
|
from moviepy.editor import AudioFileClip |
|
from request_llm.bridge_all import model_info |
|
|
|
|
|
api_key = select_api_key(llm_kwargs['api_key'], llm_kwargs['llm_model']) |
|
chat_endpoint = model_info[llm_kwargs['llm_model']]['endpoint'] |
|
|
|
whisper_endpoint = chat_endpoint.replace('chat/completions', 'audio/transcriptions') |
|
url = whisper_endpoint |
|
headers = { |
|
'Authorization': f"Bearer {api_key}" |
|
} |
|
|
|
os.makedirs('gpt_log/mp3/', exist_ok=True) |
|
for index, fp in enumerate(file_manifest): |
|
audio_history = [] |
|
|
|
ext = os.path.splitext(fp)[1] |
|
|
|
if ext not in [".mp3", ".wav", ".m4a", ".mpga"]: |
|
audio_clip = AudioFileClip(fp) |
|
audio_clip.write_audiofile(f'gpt_log/mp3/output{index}.mp3') |
|
fp = f'gpt_log/mp3/output{index}.mp3' |
|
|
|
voice = split_audio_file(fp) |
|
for j, i in enumerate(voice): |
|
with open(i, 'rb') as f: |
|
file_content = f.read() |
|
files = { |
|
'file': (os.path.basename(i), file_content), |
|
} |
|
data = { |
|
"model": "whisper-1", |
|
"prompt": parse_prompt, |
|
'response_format': "text" |
|
} |
|
|
|
chatbot.append([f"将 {i} 发送到openai音频解析终端 (whisper),当前参数:{parse_prompt}", "正在处理 ..."]) |
|
yield from update_ui(chatbot=chatbot, history=history) |
|
proxies, = get_conf('proxies') |
|
response = requests.post(url, headers=headers, files=files, data=data, proxies=proxies).text |
|
|
|
chatbot.append(["音频解析结果", response]) |
|
history.extend(["音频解析结果", response]) |
|
yield from update_ui(chatbot=chatbot, history=history) |
|
|
|
i_say = f'请对下面的音频片段做概述,音频内容是 ```{response}```' |
|
i_say_show_user = f'第{index + 1}段音频的第{j + 1} / {len(voice)}片段。' |
|
gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive( |
|
inputs=i_say, |
|
inputs_show_user=i_say_show_user, |
|
llm_kwargs=llm_kwargs, |
|
chatbot=chatbot, |
|
history=[], |
|
sys_prompt=f"总结音频。音频文件名{fp}" |
|
) |
|
|
|
chatbot[-1] = (i_say_show_user, gpt_say) |
|
history.extend([i_say_show_user, gpt_say]) |
|
audio_history.extend([i_say_show_user, gpt_say]) |
|
|
|
|
|
result = "".join(audio_history) |
|
if len(audio_history) > 1: |
|
i_say = f"根据以上的对话,使用中文总结音频“{result}”的主要内容。" |
|
i_say_show_user = f'第{index + 1}段音频的主要内容:' |
|
gpt_say = yield from request_gpt_model_in_new_thread_with_ui_alive( |
|
inputs=i_say, |
|
inputs_show_user=i_say_show_user, |
|
llm_kwargs=llm_kwargs, |
|
chatbot=chatbot, |
|
history=audio_history, |
|
sys_prompt="总结文章。" |
|
) |
|
|
|
history.extend([i_say, gpt_say]) |
|
audio_history.extend([i_say, gpt_say]) |
|
|
|
res = write_results_to_file(history) |
|
chatbot.append((f"第{index + 1}段音频完成了吗?", res)) |
|
yield from update_ui(chatbot=chatbot, history=history) |
|
|
|
|
|
import shutil |
|
shutil.rmtree('gpt_log/mp3') |
|
res = write_results_to_file(history) |
|
chatbot.append(("所有音频都总结完成了吗?", res)) |
|
yield from update_ui(chatbot=chatbot, history=history) |
|
|
|
|
|
@CatchException |
|
def 总结音视频(txt, llm_kwargs, plugin_kwargs, chatbot, history, system_prompt, WEB_PORT): |
|
import glob, os |
|
|
|
|
|
chatbot.append([ |
|
"函数插件功能?", |
|
"总结音视频内容,函数插件贡献者: dalvqw & BinaryHusky"]) |
|
yield from update_ui(chatbot=chatbot, history=history) |
|
|
|
try: |
|
from moviepy.editor import AudioFileClip |
|
except: |
|
report_execption(chatbot, history, |
|
a=f"解析项目: {txt}", |
|
b=f"导入软件依赖失败。使用该模块需要额外依赖,安装方法```pip install --upgrade moviepy```。") |
|
yield from update_ui(chatbot=chatbot, history=history) |
|
return |
|
|
|
|
|
history = [] |
|
|
|
|
|
if os.path.exists(txt): |
|
project_folder = txt |
|
else: |
|
if txt == "": txt = '空空如也的输入栏' |
|
report_execption(chatbot, history, a=f"解析项目: {txt}", b=f"找不到本地项目或无权访问: {txt}") |
|
yield from update_ui(chatbot=chatbot, history=history) |
|
return |
|
|
|
|
|
extensions = ['.mp4', '.m4a', '.wav', '.mpga', '.mpeg', '.mp3', '.avi', '.mkv', '.flac', '.aac'] |
|
|
|
if txt.endswith(tuple(extensions)): |
|
file_manifest = [txt] |
|
else: |
|
file_manifest = [] |
|
for extension in extensions: |
|
file_manifest.extend(glob.glob(f'{project_folder}/**/*{extension}', recursive=True)) |
|
|
|
|
|
if len(file_manifest) == 0: |
|
report_execption(chatbot, history, a=f"解析项目: {txt}", b=f"找不到任何音频或视频文件: {txt}") |
|
yield from update_ui(chatbot=chatbot, history=history) |
|
return |
|
|
|
|
|
if ("advanced_arg" in plugin_kwargs) and (plugin_kwargs["advanced_arg"] == ""): plugin_kwargs.pop("advanced_arg") |
|
parse_prompt = plugin_kwargs.get("advanced_arg", '将音频解析为简体中文') |
|
yield from AnalyAudio(parse_prompt, file_manifest, llm_kwargs, chatbot, history) |
|
|
|
yield from update_ui(chatbot=chatbot, history=history) |
|
|