Spaces:
Running
Running
File size: 8,032 Bytes
9fa4d05 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
import os
import subprocess
import json
def stitch_and_caption(
segment_videos,
audio_path,
transcription_segments,
template_name,
work_dir=".",
crossfade_duration=0.25
):
"""
Stitch video segments with crossfade transitions, add original audio, and overlay kinetic captions.
Args:
segment_videos (list): List of file paths for the video segments.
audio_path (str): Path to the original audio file.
transcription_segments (list): The list of segment dictionaries from segment.py, including text and word timestamps.
template_name (str): The name of the PyCaps template to use.
work_dir (str): The working directory for temporary and final files.
crossfade_duration (float): Duration of crossfade transitions in seconds (0 for hard cuts).
Returns:
str: The path to the final subtitled video.
"""
if not segment_videos:
raise RuntimeError("No video segments to stitch.")
stitched_path = os.path.join(work_dir, "stitched.mp4")
final_path = os.path.join(work_dir, "final_video.mp4")
# 1. Stitch video segments together with crossfades using ffmpeg
print("Stitching video segments with crossfades...")
try:
# Get accurate durations for each video segment using ffprobe
durations = [_get_video_duration(seg_file) for seg_file in segment_videos]
cross_dur = crossfade_duration # Crossfade duration in seconds
# Handle the case where crossfade is disabled (hard cuts)
if cross_dur <= 0:
# Use concat demuxer for hard cuts (more reliable for exact segment timing)
concat_file = os.path.join(work_dir, "concat_list.txt")
with open(concat_file, "w") as f:
for seg_file in segment_videos:
f.write(f"file '{os.path.abspath(seg_file)}'\n")
# Run ffmpeg with concat demuxer
cmd = [
"ffmpeg", "-y",
"-f", "concat",
"-safe", "0",
"-i", concat_file,
"-i", audio_path,
"-c:v", "copy", # Copy video stream without re-encoding for speed
"-c:a", "aac",
"-b:a", "192k",
"-map", "0:v",
"-map", "1:a",
"-shortest",
stitched_path
]
subprocess.run(cmd, check=True, capture_output=True, text=True)
else:
# Build the complex filter string for ffmpeg with crossfades
inputs = []
filter_complex_parts = []
stream_labels = []
# Prepare inputs and initial stream labels
for i, seg_file in enumerate(segment_videos):
inputs.extend(["-i", seg_file])
stream_labels.append(f"[{i}:v]")
# If only one video, no stitching needed, just prep for subtitling
if len(segment_videos) == 1:
final_video_stream = "[0:v]"
filter_complex_str = f"[0:v]format=yuv420p[video]"
else:
# Sequentially chain xfade filters
last_stream_label = stream_labels[0]
current_offset = 0.0
for i in range(len(segment_videos) - 1):
current_offset += durations[i] - cross_dur
next_stream_label = f"v{i+1}"
filter_complex_parts.append(
f"{last_stream_label}{stream_labels[i+1]}"
f"xfade=transition=fade:duration={cross_dur}:offset={current_offset}"
f"[{next_stream_label}]"
)
last_stream_label = f"[{next_stream_label}]"
final_video_stream = last_stream_label
filter_complex_str = ";".join(filter_complex_parts)
filter_complex_str += f";{final_video_stream}format=yuv420p[video]"
# Construct the full ffmpeg command
cmd = ["ffmpeg", "-y"]
cmd.extend(inputs)
cmd.extend(["-i", audio_path]) # Add original audio as the last input
cmd.extend([
"-filter_complex", filter_complex_str,
"-map", "[video]", # Map the final video stream
"-map", f"{len(segment_videos)}:a", # Map the audio stream
"-c:v", "libx264",
"-crf", "18",
"-preset", "fast",
"-c:a", "aac",
"-b:a", "192k",
"-shortest", # Finish encoding when the shortest stream ends
stitched_path
])
subprocess.run(cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print("Error during ffmpeg stitching:")
print("FFMPEG stdout:", e.stdout)
print("FFMPEG stderr:", e.stderr)
raise RuntimeError("FFMPEG stitching failed.") from e
# 2. Use PyCaps to render captions on the stitched video
print("Overlaying kinetic subtitles...")
# Save the real transcription data to a JSON file for PyCaps
transcription_json_path = os.path.join(work_dir, "transcription_for_pycaps.json")
_save_whisper_json(transcription_segments, transcription_json_path)
# Run pycaps render command
try:
pycaps_cmd = [
"pycaps", "render",
"--input", stitched_path,
"--template", os.path.join("templates", template_name),
"--whisper-json", transcription_json_path,
"--output", final_path
]
subprocess.run(pycaps_cmd, check=True, capture_output=True, text=True)
except FileNotFoundError:
raise RuntimeError("`pycaps` command not found. Make sure pycaps is installed correctly (e.g., `pip install git+https://github.com/francozanardi/pycaps.git`).")
except subprocess.CalledProcessError as e:
print("Error during PyCaps subtitle rendering:")
print("PyCaps stdout:", e.stdout)
print("PyCaps stderr:", e.stderr)
raise RuntimeError("PyCaps rendering failed.") from e
return final_path
def _get_video_duration(file_path):
"""Get video duration in seconds using ffprobe."""
try:
cmd = [
"ffprobe", "-v", "error",
"-select_streams", "v:0",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
file_path
]
output = subprocess.check_output(cmd, text=True).strip()
return float(output)
except (subprocess.CalledProcessError, FileNotFoundError, ValueError) as e:
print(f"Warning: Could not get duration for {file_path}. Error: {e}. Falling back to 0.0.")
return 0.0
def _save_whisper_json(transcription_segments, json_path):
"""
Saves the transcription segments into a Whisper-formatted JSON file for PyCaps.
Args:
transcription_segments (list): A list of segment dictionaries, each containing
'start', 'end', 'text', and 'words' keys.
json_path (str): The file path to save the JSON data.
"""
print(f"Saving transcription to {json_path} for subtitling...")
# The structure pycaps expects is a dictionary with a "segments" key,
# which contains the list of segment dictionaries.
output_data = {
"text": " ".join([seg.get('text', '') for seg in transcription_segments]),
"segments": transcription_segments,
"language": "en"
}
try:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(output_data, f, ensure_ascii=False, indent=2)
except Exception as e:
raise RuntimeError(f"Failed to write transcription JSON file at {json_path}") from e |