Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
import os | |
import random | |
import re | |
import shutil | |
import time | |
from abc import ABC, abstractmethod | |
from datetime import datetime | |
from pathlib import Path | |
import gradio as gr | |
from elevenlabs_helper import ElevenLabsHelper | |
# --- | |
talk_key = "talk" | |
# --- | |
valid_image_exts = (".jpg", ".jpeg", ".png", ".bmp", ".tiff", ".webp") | |
def is_image(file_path): | |
return file_path.lower().endswith(valid_image_exts) | |
def get_formatted_datetime_name() -> str: | |
d = datetime.now() | |
return d.strftime("d%y%m%d" + "-" + "t%H%M%S") | |
def get_name_ext(filepath): | |
filepath = os.path.abspath(filepath) | |
_, name_ext = os.path.split(filepath) | |
name, ext = os.path.splitext(name_ext) | |
return name, ext | |
def sanitize_string(string): | |
sanitized_string = re.sub(r"[^A-Za-z0-9]", "", string) | |
max_len = 15 | |
return sanitized_string[:max_len] | |
def get_output_video_name( | |
input_base_path, input_driving_path, base_motion_expression, input_driving_audio_path, tag="" | |
): | |
if not tag: | |
tag = get_formatted_datetime_name() | |
base_name, _ = get_name_ext(input_base_path) | |
base_name = sanitize_string(base_name) | |
driving_name = "" | |
if input_driving_path: | |
driving_name, _ = get_name_ext(input_driving_path) | |
driving_name = sanitize_string(driving_name) | |
elif base_motion_expression and is_image(input_base_path): | |
driving_name = base_motion_expression | |
audio_name = "" | |
if input_driving_audio_path: | |
audio_name, _ = get_name_ext(input_driving_audio_path) | |
audio_name = sanitize_string(audio_name) | |
output_video_name = f"{tag}--b-{base_name}" | |
if driving_name: | |
output_video_name += f"--d-{driving_name}" | |
if audio_name: | |
output_video_name += f"--a-{audio_name}" | |
return output_video_name | |
def generate_random_integer(num_digits): | |
current_time = int(time.time() * 1000) | |
random.seed(current_time) | |
lower_bound = 0 | |
upper_bound = (10**num_digits) - 1 | |
return random.randint(lower_bound, upper_bound) | |
def get_unique_name(maxd=4, delim="-"): | |
pid = os.getpid() | |
pid_str = str(pid)[-maxd:] | |
time_ns = time.time_ns() | |
time_str = str(time_ns)[-maxd:] | |
rint = generate_random_integer(maxd) | |
rint_str = str(rint).zfill(maxd) | |
return delim.join([pid_str, time_str, rint_str]) | |
def mkdir_p(path: str) -> None: | |
if not Path(path).exists(): | |
Path(path).mkdir(parents=True) | |
# --- | |
class BaseTaskExecutor(ABC): | |
def __init__(self): | |
self.tmp_dir = "/tmp/gradio" | |
def execute_task( | |
self, input_base_path, base_motion_expression, input_driving_audio_path, driving_text_input, driving_voice_input | |
): | |
tag = get_unique_name() | |
output_dir = os.path.join(self.tmp_dir, tag) | |
mkdir_p(output_dir) | |
do_dafile = input_driving_audio_path is not None and os.path.exists(input_driving_audio_path) | |
do_datts = driving_text_input and driving_voice_input | |
do_talk = do_dafile or do_datts | |
if base_motion_expression: | |
if talk_key not in base_motion_expression and do_talk: | |
gr.Warning( | |
f"Ignoring Driving Audio since expressive Base Motion selected: {base_motion_expression}") | |
do_dafile = False | |
do_datts = False | |
do_talk = False | |
if talk_key in base_motion_expression and not do_talk: | |
gr.Warning(f"Selected talking Base Motion but no Driving Audio") | |
else: | |
base_motion_expression = "" | |
if do_datts: | |
if do_dafile: | |
gr.Warning("Ignoring Audio File input since TTS is selected.\nClear the undesired input if this is not intended.") | |
output_audio_file = os.path.join(f"{output_dir}/{tag}.mp3") | |
ElevenLabsHelper.generate_voice(driving_text_input, driving_voice_input, output_audio_file) | |
input_driving_audio_path = output_audio_file | |
if not do_talk: | |
input_driving_audio_path = "" | |
if input_base_path is not None and os.path.exists(input_base_path): | |
input_driving_path = "" | |
request_id = get_unique_name(maxd=8, delim="") | |
output_video_path = os.path.join( | |
self.tmp_dir, | |
get_output_video_name( | |
input_base_path, input_driving_path, base_motion_expression, input_driving_audio_path | |
) | |
+ ".mp4", | |
) | |
result, output_video_path = self.generate( | |
input_base_path, | |
input_driving_path, | |
base_motion_expression, | |
input_driving_audio_path, | |
output_video_path, | |
request_id, | |
) | |
success = result["success"] | |
messages = result["messages"] | |
self.clean(output_dir) | |
if success: | |
return output_video_path, gr.update(visible=True), messages | |
else: | |
gr.Info("Task could not be completed", duration=4) | |
return None, gr.update(visible=False), f"ERROR\n\n{messages}" | |
else: | |
self.clean(output_dir) | |
raise gr.Error("No source selected!", duration=6) | |
def generate(self): | |
pass | |
def clean(self, output_dir): | |
if os.path.isdir(output_dir): | |
shutil.rmtree(output_dir) | |