hivecorp's picture
Update app.py
fa78842 verified
import gradio as gr
import edge_tts
import asyncio
import tempfile
import os
import librosa
import numpy as np
import srt
import datetime
import re
import json
import subprocess
from PIL import Image, ImageDraw, ImageFont
from pydub import AudioSegment
state = {"audio": None, "srt": None, "json": None}
async def get_voices():
voices = await edge_tts.list_voices()
return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices}
async def split_text_and_generate_audio(text, voice, rate, pitch):
voice_short = voice.split(" - ")[0]
rate_str = f"{int(rate):+d}%"
pitch_str = f"{int(pitch):+d}Hz"
segments = re.split(r'(?<=[.?!])\s+|\n+', text.strip())
async def synthesize(seg_text, idx):
tmp_path = tempfile.NamedTemporaryFile(delete=False, suffix=f"_{idx}.mp3").name
communicate = edge_tts.Communicate(seg_text.strip(), voice_short, rate=rate_str, pitch=pitch_str)
await communicate.save(tmp_path)
return tmp_path
tasks = [synthesize(seg, i) for i, seg in enumerate(segments) if seg.strip()]
mp3_paths = await asyncio.gather(*tasks)
final_audio = AudioSegment.empty()
for path in mp3_paths:
final_audio += AudioSegment.from_file(path, format="mp3")
os.remove(path)
out_path = tempfile.NamedTemporaryFile(delete=False, suffix=".mp3").name
final_audio.export(out_path, format="mp3", bitrate="64k")
return out_path
def smart_segment_text(text):
sentences = re.split(r'(?<=[.?!])\s+', text.strip())
blocks = []
for sentence in sentences:
words = sentence.strip().split()
i = 0
while i < len(words):
end = min(i + 8, len(words))
block = words[i:end]
if len(block) < 5 and i + 8 < len(words):
end = i + 5
block = words[i:end]
blocks.append(" ".join(block))
i = end
return blocks
def generate_srt(audio_path, input_text):
y, sr = librosa.load(audio_path)
intervals = librosa.effects.split(y, top_db=30)
raw_sentences = re.split(r'(?<=[.?!])\s+', input_text.strip())
final_blocks = []
sentence_map = []
for sent_idx, sentence in enumerate(raw_sentences):
words = sentence.strip().split()
i = 0
while i < len(words):
end = min(i + 8, len(words))
block = words[i:end]
final_blocks.append(" ".join(block))
sentence_map.append(sent_idx)
i = end
subs = []
sentence_intervals = intervals[:len(raw_sentences)]
block_idx = 0
for sent_idx, (start_idx, end_idx) in enumerate(sentence_intervals):
start_time = start_idx / sr
end_time = end_idx / sr
sentence_blocks = [final_blocks[i] for i, s_idx in enumerate(sentence_map) if s_idx == sent_idx]
num_blocks = len(sentence_blocks)
seg_duration = (end_time - start_time) / num_blocks
for i in range(num_blocks):
b_start = start_time + i * seg_duration
b_end = b_start + seg_duration
subs.append(srt.Subtitle(
index=block_idx + 1,
start=datetime.timedelta(seconds=round(b_start, 3)),
end=datetime.timedelta(seconds=round(b_end, 3)),
content=sentence_blocks[i]
))
block_idx += 1
return srt.compose(subs)
def save_srt(srt_text):
with tempfile.NamedTemporaryFile(delete=False, suffix=".srt", mode='w', encoding='utf-8') as f:
f.write(srt_text)
return f.name
def generate_word_json(srt_text):
subtitles = list(srt.parse(srt_text))
data = []
for sub in subtitles:
words = sub.content.strip().split()
start = sub.start.total_seconds()
end = sub.end.total_seconds()
duration = end - start
if not words:
continue
word_time = duration / len(words)
for i, word in enumerate(words):
word_start = start + i * word_time
word_end = word_start + word_time
data.append({
"word": word,
"start": round(word_start, 3),
"end": round(word_end, 3),
"line": sub.index
})
path = tempfile.NamedTemporaryFile(delete=False, suffix=".json").name
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f)
return path
def make_video():
if not (state["audio"] and state["srt"] and state["json"]):
return None
with open(state["json"]) as f:
word_data = json.load(f)
subtitles = list(srt.parse(open(state["srt"]).read()))
width, height, fps = 1280, 100, 30
font_path = "OpenSans-Regular.ttf"
try:
font = ImageFont.truetype(font_path, 36)
except OSError:
font = ImageFont.load_default()
duration = max([w["end"] for w in word_data]) + 0.5
frame_count = int(duration * fps)
ram_tmp = "/dev/shm" if os.path.exists("/dev/shm") else tempfile.gettempdir()
out_dir = tempfile.mkdtemp(dir=ram_tmp)
for frame in range(frame_count):
t = frame / fps
base = Image.new("RGB", (width, height), (0, 255, 0)) # green background
overlay = Image.new("RGBA", (width, height), (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
active_line = None
for sub in subtitles:
if sub.start.total_seconds() <= t <= sub.end.total_seconds():
active_line = sub
break
if active_line:
words = active_line.content.strip().split()
x_cursor = width // 2
y_text = height // 2 # both white and yellow share the same Y for highlight effect
start = active_line.start.total_seconds()
end = active_line.end.total_seconds()
fade_duration = 0.25
if t < start + fade_duration:
alpha = int(255 * (t - start) / fade_duration)
elif t > end - fade_duration:
alpha = int(255 * (end - t) / fade_duration)
else:
alpha = 255
alpha = max(0, min(alpha, 255))
word_widths = [draw.textbbox((0, 0), w + " ", font=font)[2] for w in words]
total_width = sum(word_widths)
x_cursor -= total_width // 2
x_pos = x_cursor
# Step 1: draw all words in white
for w, w_width in zip(words, word_widths):
draw.text((x_pos, y_text), w + " ", font=font, fill=(255, 255, 255, alpha))
x_pos += w_width
# Step 2: draw only current animated word in yellow (on top of white)
x_pos = x_cursor
for w, w_width in zip(words, word_widths):
for wd in word_data:
if wd["line"] == active_line.index and wd["word"] == w:
if wd["start"] <= t <= wd["end"]:
draw.text((x_pos, y_text), w + " ", font=font, fill=(255, 255, 0, alpha))
x_pos += w_width
base = base.convert("RGBA")
base.alpha_composite(overlay)
base.convert("RGB").save(os.path.join(out_dir, f"frame_{frame:05d}.png"))
output_video = os.path.join(tempfile.gettempdir(), "final_output.mp4")
subprocess.call([
"ffmpeg", "-y", "-r", str(fps), "-f", "image2",
"-i", os.path.join(out_dir, "frame_%05d.png"),
"-i", state["audio"],
"-c:v", "libx264", "-preset", "ultrafast", "-threads", "2", "-pix_fmt", "yuv420p",
"-c:a", "aac", "-shortest", output_video
])
return output_video
def tts_interface(text, voice, rate, pitch):
audio = asyncio.run(split_text_and_generate_audio(text, voice, rate, pitch))
srt_text = generate_srt(audio, text)
srt_path = save_srt(srt_text)
json_path = generate_word_json(srt_text)
state["audio"] = audio
state["srt"] = srt_path
state["json"] = json_path
return audio, srt_path, json_path, None, ""
async def create_demo():
voices = await get_voices()
with gr.Blocks() as demo:
gr.Markdown("### 🎙️ TTS + Subtitle + Animated Video Generator")
with gr.Row():
txt = gr.Textbox(label="Text", lines=4)
with gr.Row():
v = gr.Dropdown(choices=list(voices.keys()), label="Voice")
r = gr.Slider(-50, 50, 0, label="Rate (%)")
p = gr.Slider(-20, 20, 0, label="Pitch (Hz)")
b = gr.Button("Generate Audio + Subtitles")
a = gr.Audio(label="Audio", type="filepath")
srt = gr.File(label=".srt File")
j = gr.File(label=".json Word Timing")
video = gr.Video(label="🎥 Final Video")
warn = gr.Markdown(visible=False)
b.click(tts_interface, [txt, v, r, p], [a, srt, j, video, warn])
make_vid = gr.Button("🎥 Make Video")
make_vid.click(make_video, None, video)
return demo
if __name__ == "__main__":
demo = asyncio.run(create_demo())
demo.launch()