Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import edge_tts | |
| import asyncio | |
| import tempfile | |
| import os | |
| import librosa | |
| import numpy as np | |
| import srt | |
| import datetime | |
| import re | |
| import json | |
| import subprocess | |
| from PIL import Image, ImageDraw, ImageFont | |
| from pydub import AudioSegment | |
| state = {"audio": None, "srt": None, "json": None} | |
| async def get_voices(): | |
| voices = await edge_tts.list_voices() | |
| return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} | |
| async def split_text_and_generate_audio(text, voice, rate, pitch): | |
| voice_short = voice.split(" - ")[0] | |
| rate_str = f"{int(rate):+d}%" | |
| pitch_str = f"{int(pitch):+d}Hz" | |
| segments = re.split(r'(?<=[.?!])\s+|\n+', text.strip()) | |
| async def synthesize(seg_text, idx): | |
| tmp_path = tempfile.NamedTemporaryFile(delete=False, suffix=f"_{idx}.mp3").name | |
| communicate = edge_tts.Communicate(seg_text.strip(), voice_short, rate=rate_str, pitch=pitch_str) | |
| await communicate.save(tmp_path) | |
| return tmp_path | |
| tasks = [synthesize(seg, i) for i, seg in enumerate(segments) if seg.strip()] | |
| mp3_paths = await asyncio.gather(*tasks) | |
| final_audio = AudioSegment.empty() | |
| for path in mp3_paths: | |
| final_audio += AudioSegment.from_file(path, format="mp3") | |
| os.remove(path) | |
| out_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3").name | |
| final_audio.export(out_path, format="mp3", bitrate="64k") | |
| return out_path | |
| def smart_segment_text(text): | |
| sentences = re.split(r'(?<=[.?!])\s+', text.strip()) | |
| blocks = [] | |
| for sentence in sentences: | |
| words = sentence.strip().split() | |
| i = 0 | |
| while i < len(words): | |
| end = min(i + 8, len(words)) | |
| block = words[i:end] | |
| if len(block) < 5 and i + 8 < len(words): | |
| end = i + 5 | |
| block = words[i:end] | |
| blocks.append(" ".join(block)) | |
| i = end | |
| return blocks | |
| def generate_srt(audio_path, input_text): | |
| y, sr = librosa.load(audio_path) | |
| intervals = librosa.effects.split(y, top_db=30) | |
| raw_sentences = re.split(r'(?<=[.?!])\s+', input_text.strip()) | |
| final_blocks = [] | |
| sentence_map = [] | |
| for sent_idx, sentence in enumerate(raw_sentences): | |
| words = sentence.strip().split() | |
| i = 0 | |
| while i < len(words): | |
| end = min(i + 8, len(words)) | |
| block = words[i:end] | |
| final_blocks.append(" ".join(block)) | |
| sentence_map.append(sent_idx) | |
| i = end | |
| subs = [] | |
| sentence_intervals = intervals[:len(raw_sentences)] | |
| block_idx = 0 | |
| for sent_idx, (start_idx, end_idx) in enumerate(sentence_intervals): | |
| start_time = start_idx / sr | |
| end_time = end_idx / sr | |
| sentence_blocks = [final_blocks[i] for i, s_idx in enumerate(sentence_map) if s_idx == sent_idx] | |
| num_blocks = len(sentence_blocks) | |
| seg_duration = (end_time - start_time) / num_blocks | |
| for i in range(num_blocks): | |
| b_start = start_time + i * seg_duration | |
| b_end = b_start + seg_duration | |
| subs.append(srt.Subtitle( | |
| index=block_idx + 1, | |
| start=datetime.timedelta(seconds=round(b_start, 3)), | |
| end=datetime.timedelta(seconds=round(b_end, 3)), | |
| content=sentence_blocks[i] | |
| )) | |
| block_idx += 1 | |
| return srt.compose(subs) | |
| def save_srt(srt_text): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".srt", mode='w', encoding='utf-8') as f: | |
| f.write(srt_text) | |
| return f.name | |
| def generate_word_json(srt_text): | |
| subtitles = list(srt.parse(srt_text)) | |
| data = [] | |
| for sub in subtitles: | |
| words = sub.content.strip().split() | |
| start = sub.start.total_seconds() | |
| end = sub.end.total_seconds() | |
| duration = end - start | |
| if not words: | |
| continue | |
| word_time = duration / len(words) | |
| for i, word in enumerate(words): | |
| word_start = start + i * word_time | |
| word_end = word_start + word_time | |
| data.append({ | |
| "word": word, | |
| "start": round(word_start, 3), | |
| "end": round(word_end, 3), | |
| "line": sub.index | |
| }) | |
| path = tempfile.NamedTemporaryFile(delete=False, suffix=".json").name | |
| with open(path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f) | |
| return path | |
| def make_video(): | |
| if not (state["audio"] and state["srt"] and state["json"]): | |
| return None | |
| with open(state["json"]) as f: | |
| word_data = json.load(f) | |
| subtitles = list(srt.parse(open(state["srt"]).read())) | |
| width, height, fps = 1280, 100, 30 | |
| font_path = "OpenSans-Regular.ttf" | |
| try: | |
| font = ImageFont.truetype(font_path, 36) | |
| except OSError: | |
| font = ImageFont.load_default() | |
| duration = max([w["end"] for w in word_data]) + 0.5 | |
| frame_count = int(duration * fps) | |
| ram_tmp = "/dev/shm" if os.path.exists("/dev/shm") else tempfile.gettempdir() | |
| out_dir = tempfile.mkdtemp(dir=ram_tmp) | |
| for frame in range(frame_count): | |
| t = frame / fps | |
| base = Image.new("RGB", (width, height), (0, 255, 0)) # green background | |
| overlay = Image.new("RGBA", (width, height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(overlay) | |
| active_line = None | |
| for sub in subtitles: | |
| if sub.start.total_seconds() <= t <= sub.end.total_seconds(): | |
| active_line = sub | |
| break | |
| if active_line: | |
| words = active_line.content.strip().split() | |
| x_cursor = width // 2 | |
| y_text = height // 2 # both white and yellow share the same Y for highlight effect | |
| start = active_line.start.total_seconds() | |
| end = active_line.end.total_seconds() | |
| fade_duration = 0.25 | |
| if t < start + fade_duration: | |
| alpha = int(255 * (t - start) / fade_duration) | |
| elif t > end - fade_duration: | |
| alpha = int(255 * (end - t) / fade_duration) | |
| else: | |
| alpha = 255 | |
| alpha = max(0, min(alpha, 255)) | |
| word_widths = [draw.textbbox((0, 0), w + " ", font=font)[2] for w in words] | |
| total_width = sum(word_widths) | |
| x_cursor -= total_width // 2 | |
| x_pos = x_cursor | |
| # Step 1: draw all words in white | |
| for w, w_width in zip(words, word_widths): | |
| draw.text((x_pos, y_text), w + " ", font=font, fill=(255, 255, 255, alpha)) | |
| x_pos += w_width | |
| # Step 2: draw only current animated word in yellow (on top of white) | |
| x_pos = x_cursor | |
| for w, w_width in zip(words, word_widths): | |
| for wd in word_data: | |
| if wd["line"] == active_line.index and wd["word"] == w: | |
| if wd["start"] <= t <= wd["end"]: | |
| draw.text((x_pos, y_text), w + " ", font=font, fill=(255, 255, 0, alpha)) | |
| x_pos += w_width | |
| base = base.convert("RGBA") | |
| base.alpha_composite(overlay) | |
| base.convert("RGB").save(os.path.join(out_dir, f"frame_{frame:05d}.png")) | |
| output_video = os.path.join(tempfile.gettempdir(), "final_output.mp4") | |
| subprocess.call([ | |
| "ffmpeg", "-y", "-r", str(fps), "-f", "image2", | |
| "-i", os.path.join(out_dir, "frame_%05d.png"), | |
| "-i", state["audio"], | |
| "-c:v", "libx264", "-preset", "ultrafast", "-threads", "2", "-pix_fmt", "yuv420p", | |
| "-c:a", "aac", "-shortest", output_video | |
| ]) | |
| return output_video | |
| def tts_interface(text, voice, rate, pitch): | |
| audio = asyncio.run(split_text_and_generate_audio(text, voice, rate, pitch)) | |
| srt_text = generate_srt(audio, text) | |
| srt_path = save_srt(srt_text) | |
| json_path = generate_word_json(srt_text) | |
| state["audio"] = audio | |
| state["srt"] = srt_path | |
| state["json"] = json_path | |
| return audio, srt_path, json_path, None, "" | |
| async def create_demo(): | |
| voices = await get_voices() | |
| with gr.Blocks() as demo: | |
| gr.Markdown("### 🎙️ TTS + Subtitle + Animated Video Generator") | |
| with gr.Row(): | |
| txt = gr.Textbox(label="Text", lines=4) | |
| with gr.Row(): | |
| v = gr.Dropdown(choices=list(voices.keys()), label="Voice") | |
| r = gr.Slider(-50, 50, 0, label="Rate (%)") | |
| p = gr.Slider(-20, 20, 0, label="Pitch (Hz)") | |
| b = gr.Button("Generate Audio + Subtitles") | |
| a = gr.Audio(label="Audio", type="filepath") | |
| srt = gr.File(label=".srt File") | |
| j = gr.File(label=".json Word Timing") | |
| video = gr.Video(label="🎥 Final Video") | |
| warn = gr.Markdown(visible=False) | |
| b.click(tts_interface, [txt, v, r, p], [a, srt, j, video, warn]) | |
| make_vid = gr.Button("🎥 Make Video") | |
| make_vid.click(make_video, None, video) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = asyncio.run(create_demo()) | |
| demo.launch() | |