| |
| |
| import logging |
| import os |
| import subprocess |
| import tempfile |
| from typing import Optional |
|
|
| import librosa |
| import numpy as np |
| import soundfile as sf |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def video_to_frames( |
| video_path: str, |
| frame_save_path: str, |
| fps: int = 1, |
| start_time: Optional[str] = None, |
| end_time: Optional[str] = None, |
| ) -> None: |
| """从视频中提取帧到指定目录""" |
| os.makedirs(frame_save_path, exist_ok=True) |
| cmd = ["ffmpeg", "-y"] |
| if start_time is not None: |
| cmd += ["-ss", str(start_time)] |
| if end_time is not None: |
| cmd += ["-to", str(end_time)] |
| cmd += ["-i", video_path, "-vf", f"fps={fps}", os.path.join(frame_save_path, "frame_%06d.jpg")] |
| subprocess.run(cmd, check=True, capture_output=True) |
|
|
|
|
| def video_to_audio( |
| video_path: str, |
| audio_save_path: str, |
| sr: int = 16000, |
| start_time: Optional[str] = None, |
| end_time: Optional[str] = None, |
| ) -> None: |
| """从视频中提取音频""" |
| os.makedirs(os.path.dirname(audio_save_path), exist_ok=True) |
| cmd = ["ffmpeg", "-y"] |
| if start_time is not None: |
| cmd += ["-ss", str(start_time)] |
| if end_time is not None: |
| cmd += ["-to", str(end_time)] |
| |
| cmd += ["-i", video_path, "-vn", "-ac", "1", "-ar", str(sr), audio_save_path] |
| subprocess.run(cmd, check=True, capture_output=True) |
|
|
|
|
| def format_srt_time(seconds: float) -> str: |
| """将秒数转换为 SRT 时间格式 HH:MM:SS,mmm""" |
| hours = int(seconds // 3600) |
| minutes = int((seconds % 3600) // 60) |
| secs = int(seconds % 60) |
| millis = int((seconds % 1) * 1000) |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}" |
|
|
|
|
| def generate_srt_from_results(results_log: list, video_duration: float, output_srt_path: str) -> int: |
| """ |
| 根据推理结果生成 SRT 字幕文件 |
| |
| 时间对齐:unit N 的输出从 (N+1)s 显示到 (N+2)s |
| |
| Returns: |
| 生成的字幕条数 |
| """ |
| |
| special_tokens = ["<|tts_pad|>", "<|turn_eos|>", "<|chunk_eos|>", "<|listen|>", "<|speak|>"] |
|
|
| srt_lines = [] |
| subtitle_index = 1 |
|
|
| for result in results_log: |
| chunk_idx = result["chunk_idx"] |
| text = result.get("text", "") |
| is_listen = result.get("is_listen", True) |
|
|
| if not text or is_listen: |
| continue |
|
|
| |
| clean_text = text |
| for token in special_tokens: |
| clean_text = clean_text.replace(token, "") |
| clean_text = clean_text.strip() |
|
|
| if not clean_text: |
| continue |
|
|
| |
| start_time = chunk_idx + 1 |
| end_time = chunk_idx + 2 |
|
|
| |
| if start_time >= video_duration: |
| continue |
| end_time = min(end_time, video_duration) |
|
|
| start_str = format_srt_time(start_time) |
| end_str = format_srt_time(end_time) |
|
|
| srt_lines.append(f"{subtitle_index}") |
| srt_lines.append(f"{start_str} --> {end_str}") |
| srt_lines.append(clean_text) |
| srt_lines.append("") |
|
|
| subtitle_index += 1 |
|
|
| with open(output_srt_path, "w", encoding="utf-8") as f: |
| f.write("\n".join(srt_lines)) |
|
|
| return subtitle_index - 1 |
|
|
|
|
| def build_ai_audio_file( |
| timed_output_audio: list, |
| video_duration: float, |
| output_sample_rate: int, |
| ) -> str: |
| """ |
| 生成 AI 语音音轨文件 |
| |
| 时间对齐:unit N 的音频从 (N+1)s 开始 |
| |
| Returns: |
| 音频文件路径 |
| """ |
| |
| max_end_time = 0 |
| for chunk_idx, audio in timed_output_audio: |
| start_time = chunk_idx + 1 |
| duration = len(audio) / output_sample_rate |
| end_time = start_time + duration |
| max_end_time = max(max_end_time, end_time) |
|
|
| total_duration = max(video_duration, max_end_time) |
| total_samples = int(total_duration * output_sample_rate) |
| ai_audio_track = np.zeros(total_samples, dtype=np.float32) |
|
|
| |
| for chunk_idx, audio in timed_output_audio: |
| start_time = chunk_idx + 1 |
| start_sample = int(start_time * output_sample_rate) |
| end_sample = start_sample + len(audio) |
|
|
| if end_sample <= len(ai_audio_track): |
| ai_audio_track[start_sample:end_sample] += audio |
| else: |
| available_len = len(ai_audio_track) - start_sample |
| if available_len > 0: |
| ai_audio_track[start_sample:] += audio[:available_len] |
|
|
| ai_audio_track = np.clip(ai_audio_track, -1.0, 1.0) |
|
|
| |
| ai_audio_path = tempfile.NamedTemporaryFile(suffix=".wav", delete=False).name |
| sf.write( |
| ai_audio_path, |
| (ai_audio_track * 32768).astype(np.int16), |
| output_sample_rate, |
| subtype="PCM_16", |
| ) |
|
|
| return ai_audio_path |
|
|
|
|
| def generate_duplex_video( |
| video_path: str, |
| output_video_path: str, |
| results_log: list, |
| timed_output_audio: list, |
| output_sample_rate: int = 24000, |
| ): |
| """ |
| 使用 ffmpeg 生成带有双工回复的视频(更可靠,正确处理旋转) |
| |
| - 将 AI 生成的语音混合到视频音轨 |
| - 将 AI 生成的文本作为字幕烧录到视频 |
| |
| 时间对齐逻辑: |
| - unit N (chunk_idx=N) 处理视频第 N~(N+1) 秒的输入 |
| - unit N 生成的文本/语音 → 从第 (N+1) 秒开始显示/播放 |
| """ |
| logger.info(f"=" * 60) |
| logger.info(f"Generating duplex video (ffmpeg method)") |
| logger.info(f" Input video: {video_path}") |
| logger.info(f" Output video: {output_video_path}") |
| logger.info(f" Total units: {len(results_log)}") |
| logger.info(f" Audio segments: {len(timed_output_audio)}") |
|
|
| |
| try: |
| probe_cmd = [ |
| "ffprobe", |
| "-v", |
| "error", |
| "-select_streams", |
| "v:0", |
| "-show_entries", |
| "format=duration", |
| "-of", |
| "default=noprint_wrappers=1:nokey=1", |
| video_path, |
| ] |
| result = subprocess.run(probe_cmd, capture_output=True, text=True) |
| video_duration = float(result.stdout.strip()) |
| except Exception as e: |
| logger.warning(f" ffprobe duration failed: {e}, using 60s default") |
| video_duration = 60.0 |
|
|
| logger.info(f" Video duration: {video_duration:.2f}s") |
|
|
| |
| |
| output_dir = os.path.dirname(output_video_path) |
| srt_path = os.path.join(output_dir, "subtitles.srt") |
| subtitle_count = generate_srt_from_results(results_log, video_duration, srt_path) |
| logger.info(f" Generated {subtitle_count} subtitles -> {srt_path}") |
|
|
| |
| ai_audio_path = None |
| if timed_output_audio: |
| ai_audio_path = build_ai_audio_file(timed_output_audio, video_duration, output_sample_rate) |
| logger.info(f" Generated AI audio -> {ai_audio_path}") |
|
|
| |
| has_original_audio = False |
| try: |
| probe_audio_cmd = [ |
| "ffprobe", |
| "-v", |
| "error", |
| "-select_streams", |
| "a:0", |
| "-show_entries", |
| "stream=codec_type", |
| "-of", |
| "default=noprint_wrappers=1:nokey=1", |
| video_path, |
| ] |
| result = subprocess.run(probe_audio_cmd, capture_output=True, text=True) |
| has_original_audio = result.stdout.strip() == "audio" |
| except Exception: |
| pass |
| logger.info(f" Original video has audio: {has_original_audio}") |
|
|
| |
| |
| has_subtitles = subtitle_count > 0 and os.path.exists(srt_path) |
|
|
| if has_subtitles: |
| |
| |
| srt_path_escaped = srt_path.replace("\\", "\\\\").replace("'", "'\\''").replace(":", "\\:") |
| subtitle_filter = ( |
| f"subtitles='{srt_path_escaped}':" |
| f"force_style='FontSize=28," |
| f"PrimaryColour=&H00FFFFFF," |
| f"OutlineColour=&H00000000," |
| f"BorderStyle=3," |
| f"Outline=2," |
| f"Shadow=1," |
| f"MarginV=30," |
| f"Alignment=2'" |
| ) |
| else: |
| logger.info(f" No subtitles to add") |
|
|
| |
| cmd = ["ffmpeg", "-y", "-i", video_path] |
|
|
| if ai_audio_path: |
| cmd.extend(["-i", ai_audio_path]) |
|
|
| if has_original_audio: |
| |
| if has_subtitles: |
| filter_complex = f"[0:v]{subtitle_filter}[vout];[0:a][1:a]amix=inputs=2:duration=longest[aout]" |
| cmd.extend(["-filter_complex", filter_complex, "-map", "[vout]", "-map", "[aout]"]) |
| else: |
| filter_complex = f"[0:a][1:a]amix=inputs=2:duration=longest[aout]" |
| cmd.extend(["-filter_complex", filter_complex, "-map", "0:v", "-map", "[aout]"]) |
| else: |
| |
| if has_subtitles: |
| filter_complex = f"[0:v]{subtitle_filter}[vout]" |
| cmd.extend(["-filter_complex", filter_complex, "-map", "[vout]", "-map", "1:a"]) |
| else: |
| cmd.extend(["-map", "0:v", "-map", "1:a"]) |
| else: |
| |
| if has_subtitles: |
| cmd.extend(["-vf", subtitle_filter]) |
| if has_original_audio: |
| cmd.extend(["-c:a", "copy"]) |
|
|
| cmd.extend(["-c:v", "libx264", "-c:a", "aac", "-preset", "medium", "-crf", "23", output_video_path]) |
|
|
| logger.info(f" Running ffmpeg: {' '.join(cmd[:6])}...") |
| try: |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) |
| logger.info(f" ✓ Video generated successfully") |
| except subprocess.CalledProcessError as e: |
| logger.error(f" ✗ ffmpeg failed!") |
| logger.error(f" stderr: {e.stderr}") |
| raise |
| finally: |
| if os.path.exists(srt_path): |
| os.remove(srt_path) |
| logger.info(f" Cleaned up: {srt_path}") |
| if ai_audio_path and os.path.exists(ai_audio_path): |
| os.remove(ai_audio_path) |
| logger.info(f" Cleaned up: {ai_audio_path}") |
|
|
| logger.info(f" ✓ Duplex video saved: {output_video_path}") |
| logger.info(f"=" * 60) |
| return output_video_path |
|
|
|
|
| def adjust_audio_length(audio_path: str, num_frames: int, output_path: str, sr: int = 16000) -> str: |
| """ |
| 调整音频长度以匹配帧数 |
| - 如果音频短了 → 补充静音 |
| - 如果音频长了 → 截断 |
| |
| Args: |
| audio_path: 原始音频路径 |
| num_frames: 帧数(1fps,所以帧数=秒数) |
| output_path: 输出音频路径 |
| sr: 采样率 |
| |
| Returns: |
| 调整后的音频路径 |
| """ |
| import numpy as np |
| import soundfile as sf |
|
|
| |
| audio, orig_sr = sf.read(audio_path) |
|
|
| |
| if len(audio.shape) > 1: |
| audio = audio.mean(axis=1) |
|
|
| |
| if orig_sr != sr: |
| |
| import scipy.signal as signal |
|
|
| audio = signal.resample(audio, int(len(audio) * sr / orig_sr)) |
|
|
| |
| target_length = num_frames * sr |
| current_length = len(audio) |
|
|
| if current_length < target_length: |
| |
| padding = np.zeros(target_length - current_length) |
| audio = np.concatenate([audio, padding]) |
| logger.info(f" 音频补充静音: {current_length / sr:.2f}s -> {target_length / sr:.2f}s") |
| elif current_length > target_length: |
| |
| audio = audio[:target_length] |
| logger.info(f" 音频截断: {current_length / sr:.2f}s -> {target_length / sr:.2f}s") |
| else: |
| logger.info(f" 音频长度匹配: {current_length / sr:.2f}s") |
|
|
| |
| sf.write(output_path, audio, sr) |
| return output_path |
|
|
|
|
| def get_frames_and_audio(video_path, item_output_dir, sample_rate=16000): |
| frame_path = os.path.join(item_output_dir, "input_frames") |
| audio_path = os.path.join(item_output_dir, "input_audio.wav") |
| adjusted_audio_path = os.path.join(item_output_dir, "adjusted_audio.wav") |
|
|
| if not os.path.exists(frame_path) or not os.path.isfile(audio_path) or not os.path.exists(audio_path): |
| logger.info(f"No frames in {frame_path}, or audio files in {audio_path}, regenerated it") |
| os.makedirs(frame_path, exist_ok=True) |
| video_to_frames(video_path, frame_path, fps=1) |
|
|
| video_to_audio(video_path, audio_path, sr=sample_rate) |
| audio_array, _ = librosa.load(audio_path, sr=sample_rate, mono=True) |
| logger.info(f"Extracted audio ({len(audio_array) / sample_rate:.2f}s) to {audio_path}") |
| else: |
| audio_array, _ = librosa.load(audio_path, sr=sample_rate, mono=True) |
|
|
| frame_files = sorted([f for f in os.listdir(frame_path) if f.endswith(".jpg")]) |
| num_frames = len(frame_files) |
|
|
| logger.info(f"get {num_frames} frames to {frame_path}") |
| logger.info(f"get audio ({len(audio_array) / sample_rate:.2f}s) to {audio_path}") |
|
|
| logger.info(f"adjust audio...") |
| audio_path = adjust_audio_length(audio_path, num_frames, adjusted_audio_path) |
|
|
| return frame_path, num_frames, audio_path |
|
|