import logging import os from videoclipper import VideoClipper import gradio as gr import requests from huggingface_hub import whoami import whisper from llm.openai_api import openai_call from llm.yi_moe_api import yi_moe from utils.trans_utils import extract_timestamps import os from pytubefix import YouTube import ffmpeg from yidong import YiDong API_URL_TEMPLATE = "https://api-yidong.lingyiwanwu.com/v1/ops/api_key?user_email={user_email}&user_source=huggingface" model = whisper.load_model("tiny.en") audio_clipper = VideoClipper(model) def get_user_email(oauth_token: gr.OAuthToken | None) -> str | None: def call_api(user_email): url = API_URL_TEMPLATE.format(user_email=user_email) headers = {"Authorization": f'Basic {os.getenv("AUTH")}'} response = requests.post(url, headers=headers) return response.json()["data"]["display_api_key"] if oauth_token is None: return None user_info = whoami(token=oauth_token.token) email = user_info.get("email") return call_api(email) def get_video_size(stream): return stream.filesize / (1024 * 1024) def download_youtube_video(url): try: yt = YouTube(url, use_po_token=True) video_streams = yt.streams.filter(type="video").order_by('resolution').desc() audio_stream = yt.streams.filter(only_audio=True).first() print("Available video streams:") for i, stream in enumerate(video_streams): size = get_video_size(stream) stream_type = "Progressive" if stream.is_progressive else "Adaptive" print(f"{i}. Resolution: {stream.resolution}, Size: {size:.2f} MB, Type: {stream_type}") choice = int(input("Enter the number of the video stream to download: ")) selected_stream = video_streams[choice] if not os.path.exists('videos'): os.makedirs('videos') print(f"Downloading video: {yt.title}") video_file = selected_stream.download(output_path='videos', filename_prefix="video_") if not selected_stream.is_progressive: print("Downloading audio...") audio_file = audio_stream.download(output_path='videos', filename_prefix="audio_") print("Merging video and audio...") output_file = os.path.join('videos', f"{yt.title}.mp4") stream = ffmpeg.input(video_file) audio = ffmpeg.input(audio_file) stream = ffmpeg.output(stream, audio, output_file, vcodec='libx264', acodec='aac', strict='experimental') ffmpeg.run(stream, overwrite_output=True) os.remove(video_file) os.remove(audio_file) else: output_file = video_file print(f"Downloaded: {yt.title} to 'videos' folder") print(f"File path: {output_file}") return output_file except Exception as e: print(f"An error occurred: {str(e)}") print("Please make sure you have the latest version of pytube and ffmpeg-python installed.") print("You can update them by running:") print("pip install --upgrade pytube ffmpeg-python") print("Also, ensure that ffmpeg is installed on your system and available in your PATH.") def updata_video(url): video_path = download_youtube_video(url) return video_path def audio_recog(audio_input, output_dir): return audio_clipper.recog(audio_input, None, output_dir=output_dir) def video_recog(video_input, output_dir, ASR): return audio_clipper.video_recog(video_input, output_dir=output_dir, ASR=ASR) def video_clip(dest_text, video_spk_input, start_ost, end_ost, state, output_dir): return audio_clipper.video_clip( dest_text, start_ost, end_ost, state, dest_spk=video_spk_input, output_dir=output_dir ) def mix_recog(video_input, audio_input,output_dir,ASR="whisper"): ''' 识别视频或音频,返回识别的文本、字幕和状态信息。 ''' output_dir = output_dir.strip() if not len(output_dir): output_dir = None else: output_dir = os.path.abspath(output_dir) audio_state, video_state = None, None if video_input is not None: # import pdb; pdb.set_trace() ############ res_text, res_srt, video_state = video_recog( video_input, output_dir=output_dir, ASR = ASR) return res_text, res_srt, video_state, None if audio_input is not None: res_text, res_srt, audio_state = audio_recog( audio_input, output_dir=output_dir) return res_text, res_srt, None, audio_state def llm_inference(system_content, user_content, srt_text, model, apikey): SUPPORT_LLM_PREFIX = ['qwen', 'gpt', 'g4f', 'moonshot',"gpt-4o","22A"] if model.startswith('qwen'): return call_qwen_model(apikey, model, user_content+'\n'+srt_text, system_content) if model.startswith('gpt') or model.startswith('moonshot'): return openai_call(apikey, model, system_content = system_content, user_content = user_content+'\n'+srt_text) if model.startswith('22A'): return yi_moe(apikey, model, user_content+'\n'+srt_text, system_content) elif model.startswith('g4f'): model = "-".join(model.split('-')[1:]) return g4f_openai_call(model, system_content, user_content+'\n'+srt_text) else: logging.error("LLM name error, only {} are supported as LLM name prefix." .format(SUPPORT_LLM_PREFIX)) def clip_and_summary(LLM_res, dest_text, video_spk_input, start_ost, end_ost, video_state, audio_state, output_dir, apikey): def AI_clip(LLM_res, dest_text, video_spk_input, start_ost, end_ost, video_state, audio_state, output_dir): timestamp_list = extract_timestamps(LLM_res) output_dir = output_dir.strip() if not len(output_dir): output_dir = None else: output_dir = os.path.abspath(output_dir) if video_state is not None: clip_video_file, message, clip_srt = audio_clipper.video_clip( dest_text, start_ost, end_ost, video_state, dest_spk=video_spk_input, output_dir=output_dir, timestamp_list=timestamp_list, add_sub=False) return clip_video_file, None, message, clip_srt if audio_state is not None: (sr, res_audio), message, clip_srt = audio_clipper.clip( dest_text, start_ost, end_ost, audio_state, dest_spk=video_spk_input, output_dir=output_dir, timestamp_list=timestamp_list, add_sub=False) return None, (sr, res_audio), message, clip_srt def get_summarizes(api_key,input_file): yd = YiDong(api_key = api_key) rid = yd.add_resource(input_file) t = yd.video_summary(rid) data = t() return data.video_summary.summary clip_video_file, nouse, message, clip_srt = AI_clip(LLM_res, dest_text, video_spk_input, start_ost, end_ost, video_state, audio_state, output_dir) summary = get_summarizes(apikey, clip_video_file) return clip_video_file, nouse, message, clip_srt, summary with gr.Blocks() as clip_service: video_state, audio_state = gr.State(), gr.State() with gr.Row(): login_button = gr.LoginButton() user_email_display = gr.Textbox( label="In order to get your user key, please click on huggingface login, the first time you login you will have the full key, please save it. After that your key will be hidden.", interactive=True, ) clip_service.load(get_user_email, inputs=None, outputs=user_email_display) logging.info(f"The value of the current variable is: {user_email_display}") youtube_url = gr.Textbox(label="🔗 Youtube视频链接|Youtube Video URL") download_button = gr.Button("📥 下载 | Download", variant="primary") video_input = gr.Video(label="视频输入 | Video Input") audio_input = gr.Audio(label="音频输入 | Audio Input") with gr.Column(): gr.Examples(['https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ClipVideo/%E4%B8%BA%E4%BB%80%E4%B9%88%E8%A6%81%E5%A4%9A%E8%AF%BB%E4%B9%A6%EF%BC%9F%E8%BF%99%E6%98%AF%E6%88%91%E5%90%AC%E8%BF%87%E6%9C%80%E5%A5%BD%E7%9A%84%E7%AD%94%E6%A1%88-%E7%89%87%E6%AE%B5.mp4', 'https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ClipVideo/2022%E4%BA%91%E6%A0%96%E5%A4%A7%E4%BC%9A_%E7%89%87%E6%AE%B52.mp4', 'https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ClipVideo/%E4%BD%BF%E7%94%A8chatgpt_%E7%89%87%E6%AE%B5.mp4'], [video_input], label='示例视频 | Demo Video') gr.Examples(['https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ClipVideo/%E8%AE%BF%E8%B0%88.mp4'], [video_input], label='多说话人示例视频 | Multi-speaker Demo Video') gr.Examples(['https://isv-data.oss-cn-hangzhou.aliyuncs.com/ics/MaaS/ClipVideo/%E9%B2%81%E8%82%83%E9%87%87%E8%AE%BF%E7%89%87%E6%AE%B51.wav'], [audio_input], label="示例音频 | Demo Audio",visible=False) ASR_model = gr.Dropdown( choices=["whisper"], value="whisper", label="ASR Model Name", allow_custom_value=True) recog_button = gr.Button("👂 识别 | ASR", variant="primary") output_dir = gr.Textbox(label="📁 文件输出路径 | File Output Dir (可以为空,Linux, mac系统可以稳定使用)") video_text_output = gr.Textbox(label="✏️ 识别结果 | Recognition Result") video_srt_output = gr.Textbox(label="📖 SRT字幕内容 | RST Subtitles") prompt_head = gr.Textbox(label="Prompt System (按需更改,最好不要变动主体和要求)", value=("你是一个视频srt字幕分析剪辑器,输入视频的srt字幕和用户命令,请你结合用户指令选出符合要求的片段并输出。注意:要谨慎分析用户的问题,找出符合用户提问的srt字幕片段。" "尽可能将连续的片段并裁剪出来,将片段中在时间上连续的多个句子及它们的时间戳合并为一条,保证合并后的片段有着相同的主题" "注意确保文字与时间戳的正确匹配。你应该按照以下顺序进行处理:" "1.将srt字幕合并成数段主题的内容。2.将用户命令和查询进行匹配。" "输出需严格按照如下格式:1. [开始时间-结束时间] 文本,注意其中的连接符是“-”")) prompt_head2 = gr.Textbox(label="Prompt User(请输入用户指令)") with gr.Column(): with gr.Row(): llm_model = gr.Dropdown( choices=["gpt-4o", "22A"], value="22A", label="LLM Model Name", allow_custom_value=True) apikey_input = gr.Textbox(label="APIKEY") llm_button = gr.Button("LLM推理 | LLM Inference(首先进行识别,非g4f需配置对应apikey)", variant="primary") llm_result = gr.Textbox(label="LLM Clipper Result") llm_clip_button = gr.Button("🧠 LLM智能裁剪 | AI Clip", variant="primary") video_text_input = gr.Textbox(label="✏️ 待裁剪文本 | Text to Clip (多段文本使用'#'连接)",value ="这个不需要", visible=False) video_spk_input = gr.Textbox(label="✏️ 待裁剪说话人 | Speaker to Clip (多个说话人使用'#'连接)",value ="这个不需要", visible=False) with gr.Row(): video_start_ost = gr.Slider(minimum=-500, maximum=1000, value=0, step=50, label="⏪ 开始位置偏移 | Start Offset (ms)",visible=False) video_end_ost = gr.Slider(minimum=-500, maximum=1000, value=100, step=50, label="⏩ 结束位置偏移 | End Offset (ms)",visible=False) video_output = gr.Video(label="裁剪结果 | Video Clipped") audio_output = gr.Audio(label="裁剪结果 | Audio Clipped") clip_message = gr.Textbox(label="⚠️ 裁剪信息 | Clipping Log",visible=False) srt_clipped = gr.Textbox(label="📖 裁剪部分SRT字幕内容 | Clipped RST Subtitles",visible=False) summary = gr.Textbox(label="📖 视频摘要 | Video Summary") download_button.click(updata_video, inputs=youtube_url, outputs=video_input) recog_button.click(mix_recog, inputs=[video_input, audio_input, output_dir, ASR_model ], outputs=[video_text_output, video_srt_output, video_state, audio_state]) llm_button.click(llm_inference, inputs=[prompt_head, prompt_head2, video_srt_output, llm_model, apikey_input], outputs=[llm_result]) llm_clip_button.click(clip_and_summary, inputs=[llm_result, video_text_input, video_spk_input, video_start_ost, video_end_ost, video_state, audio_state, output_dir, user_email_display, ], outputs=[video_output, audio_output, clip_message, srt_clipped,summary]) if __name__ == "__main__": clip_service.queue( max_size=10, default_concurrency_limit=10, ) clip_service.launch(ssr_mode=False)