|
|
|
|
|
import numpy as np |
|
import soundfile |
|
import audresample |
|
import text_utils |
|
import msinference |
|
import re |
|
import srt |
|
import subprocess |
|
import cv2 |
|
import markdown |
|
import json |
|
from pathlib import Path |
|
from types import SimpleNamespace |
|
from flask import Flask, request, send_from_directory |
|
from flask_cors import CORS |
|
from moviepy.editor import * |
|
from audiocraft.audiogen import AudioGen, audio_write |
|
|
|
sound_generator = AudioGen.get_pretrained('facebook/audiogen-medium') |
|
sound_generator.set_generation_params(duration=6) |
|
|
|
CACHE_DIR = 'flask_cache/' |
|
Path(CACHE_DIR).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _shift(x): |
|
n = x.shape[0] |
|
i = np.random.randint(.24 * n, max(1, .74 * n)) |
|
x = np.roll(x, i) |
|
|
|
|
|
return x |
|
|
|
def overlay(x, sound_background=None): |
|
if sound_background is not None: |
|
sound_background = sound_background.detach().cpu().numpy()[0, :] |
|
len_speech = len(x) |
|
if len_speech > len(sound_background): |
|
n_repeat = len_speech // len(sound_background) + 1 |
|
replica = [sound_background] * n_repeat |
|
replica = [_shift(_) for _ in replica] |
|
sound_background = np.concatenate(replica) |
|
|
|
|
|
print(f'\nSOUND BACKGROUND SHAPE\n{sound_background.shape=}\n{x.shape=}\n- - - -') |
|
x = .74 * x + .26 * sound_background[:len_speech] |
|
return x |
|
|
|
def tts_multi_sentence(precomputed_style_vector=None, |
|
text=None, |
|
voice=None, |
|
scene=None): |
|
'''create 24kHZ np.array with tts |
|
|
|
precomputed_style_vector : required if en_US or en_UK in voice, so |
|
to perform affective TTS. |
|
text : string |
|
voice : string or None (falls to styleTTS) |
|
scene : 'A castle in far away lands' -> if passed will generate background sound scene |
|
''' |
|
|
|
if scene is not None: |
|
|
|
sound_background = sound_generator.generate([scene])[0] |
|
sound_background = audio_write(None, |
|
sound_background.cpu(), |
|
24000, |
|
strategy="loudness", |
|
loudness_compressor=True) |
|
else: |
|
sound_background = None |
|
|
|
|
|
if ('en_US/' in voice) or ('en_UK/' in voice) or (voice is None): |
|
assert precomputed_style_vector is not None, 'For affective TTS, style vector is needed.' |
|
x = [] |
|
for _sentence in text: |
|
x.append(msinference.inference(_sentence, |
|
precomputed_style_vector, |
|
alpha=0.3, |
|
beta=0.7, |
|
diffusion_steps=7, |
|
embedding_scale=1)) |
|
x = np.concatenate(x) |
|
|
|
return overlay(x, sound_background) |
|
|
|
|
|
text_utils.store_ssml(text=text, voice=voice) |
|
ps = subprocess.Popen(f'cat _tmp_ssml.txt | mimic3 --ssml > _tmp.wav', shell=True) |
|
ps.wait() |
|
x, fs = soundfile.read('_tmp.wav') |
|
x = audresample.resample(x.astype(np.float32), 24000, fs)[0, :] |
|
|
|
return overlay(x, sound_background) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
cors = CORS(app) |
|
|
|
|
|
@app.route("/") |
|
def index(): |
|
with open('README.md', 'r') as f: |
|
return markdown.markdown(f.read()) |
|
|
|
|
|
@app.route("/", methods=['GET', 'POST', 'PUT']) |
|
def serve_wav(): |
|
|
|
|
|
r = request.form.to_dict(flat=False) |
|
|
|
|
|
|
|
for filename, obj in request.files.items(): |
|
obj.save(f'{CACHE_DIR}{filename.replace("/","")}') |
|
|
|
print('Saved all files on Server Side\n\n') |
|
|
|
args = SimpleNamespace(text=None if r.get('text') is None else CACHE_DIR + r.get('text')[0].replace("/",""), |
|
video=None if r.get('video') is None else CACHE_DIR + r.get('video')[0].replace("/",""), |
|
image=None if r.get('image') is None else CACHE_DIR + r.get('image')[0].replace("/",""), |
|
voice=r.get('voice')[0], |
|
native=None if r.get('native') is None else CACHE_DIR + r.get('native')[0].replace("/",""), |
|
affective = r.get('affective')[0], |
|
scene=r.get('scene')[0] |
|
) |
|
|
|
|
|
|
|
print(args, 'ENTER Script') |
|
do_video_dub = True if args.text.endswith('.srt') else False |
|
|
|
SILENT_VIDEO = '_silent_video.mp4' |
|
AUDIO_TRACK = '_audio_track.wav' |
|
|
|
if do_video_dub: |
|
print('==\nFound .srt : {args.txt}, thus Video should be given as well\n\n') |
|
with open(args.text, "r") as f: |
|
s = f.read() |
|
text = [[j.content, j.start.total_seconds(), j.end.total_seconds()] for j in srt.parse(s)] |
|
assert args.video is not None |
|
native_audio_file = '_tmp.wav' |
|
subprocess.call( |
|
["ffmpeg", |
|
"-y", |
|
"-i", |
|
args.video, |
|
"-f", |
|
"mp3", |
|
"-ar", |
|
"24000", |
|
"-vn", |
|
native_audio_file]) |
|
x_native, _ = soundfile.read(native_audio_file) |
|
x_native = x_native[:, 0] |
|
|
|
else: |
|
with open(args.text, 'r') as f: |
|
t = ''.join(f) |
|
t = re.sub(' +', ' ', t) |
|
text = text_utils.split_into_sentences(t) |
|
|
|
|
|
|
|
precomputed_style_vector = None |
|
if args.native: |
|
try: |
|
precomputed_style_vector = msinference.compute_style(args.native) |
|
except soundfile.LibsndfileError: |
|
print('\n Could not voice clone audio:', args.native, 'fallback to video or Internal TTS voice.\n') |
|
if do_video_dub: |
|
native_audio_file = args.video.replace('.', '').replace('/', '') |
|
native_audio_file += '__native_audio_track.wav' |
|
soundfile.write('tgt_spk.wav', |
|
np.concatenate([ |
|
x_native[:int(4 * 24000)]], 0).astype(np.float32), 24000) |
|
precomputed_style_vector = msinference.compute_style('tgt_spk.wav') |
|
|
|
|
|
|
|
if precomputed_style_vector is None: |
|
if 'en_US' in args.voice or 'en_UK' in args.voice: |
|
_dir = '/' if args.affective else '_v2/' |
|
precomputed_style_vector = msinference.compute_style( |
|
'assets/wavs/style_vector' + _dir + args.voice.replace( |
|
'/', '_').replace( |
|
'#', '_').replace( |
|
'cmu-arctic', 'cmu_arctic').replace( |
|
'_low', '') + '.wav') |
|
print('\n STYLE VECTOR \n', precomputed_style_vector.shape) |
|
|
|
|
|
if args.video is not None: |
|
|
|
frame_tts = np.zeros((104, 1920, 3), dtype=np.uint8) |
|
font = cv2.FONT_HERSHEY_SIMPLEX |
|
bottomLeftCornerOfText = (240, 74) |
|
fontScale = 2 |
|
fontColor = (255, 255, 255) |
|
thickness = 4 |
|
lineType = 2 |
|
cv2.putText(frame_tts, 'TTS', |
|
bottomLeftCornerOfText, |
|
font, |
|
fontScale, |
|
fontColor, |
|
thickness, |
|
lineType) |
|
|
|
|
|
frame_orig = np.zeros((104, 1920, 3), dtype=np.uint8) |
|
font = cv2.FONT_HERSHEY_SIMPLEX |
|
bottomLeftCornerOfText = (101, 74) |
|
fontScale = 2 |
|
fontColor = (255, 255, 255) |
|
thickness = 4 |
|
lineType = 1000 |
|
cv2.putText(frame_orig, 'ORIGINAL VOICE', |
|
bottomLeftCornerOfText, |
|
font, |
|
fontScale, |
|
fontColor, |
|
thickness, |
|
lineType) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
video_file = args.video |
|
vf = VideoFileClip(video_file) |
|
try: |
|
|
|
num = x_native.shape[0] |
|
is_tts = .5 + .5 * np.tanh(4*(np.linspace(-10, 10, num) + 9.4)) |
|
|
|
def inpaint_banner(get_frame, t): |
|
'''blend banner - (now plays) tts or native voic |
|
''' |
|
im = np.copy(get_frame(t)) |
|
|
|
ix = int(t * 24000) |
|
|
|
if is_tts[ix] > .5: |
|
frame = frame_tts |
|
else: |
|
frame = frame_orig |
|
h, w, _ = frame.shape |
|
|
|
offset_h = 24 |
|
im[offset_h:h + offset_h, :w, :] = (.4 * im[offset_h:h + offset_h, :w, :] |
|
+ .6 * frame).astype(np.uint8) |
|
|
|
|
|
|
|
return im |
|
except UnboundLocalError: |
|
def inpaint_banner(get_frame, t): |
|
im = np.copy(get_frame(t)) |
|
frame = frame_tts |
|
h, w, _ = frame.shape |
|
offset_h = 24 |
|
im[offset_h:h + offset_h, :w, :] = (.4 * im[offset_h:h+offset_h, :w, :] |
|
+ .6 * frame).astype(np.uint8) |
|
return im |
|
vf = vf.fl(inpaint_banner) |
|
vf.write_videofile(SILENT_VIDEO) |
|
|
|
|
|
|
|
if do_video_dub: |
|
OUT_FILE = 'tmp.mp4' |
|
subtitles = text |
|
MAX_LEN = int(subtitles[-1][2] + 17) * 24000 |
|
|
|
print("TOTAL LEN SAMPLES ", MAX_LEN, '\n====================') |
|
pieces = [] |
|
for k, (_text_, orig_start, orig_end) in enumerate(subtitles): |
|
|
|
|
|
|
|
|
|
pieces.append(tts_multi_sentence(text=[_text_], |
|
precomputed_style_vector=precomputed_style_vector, |
|
voice=args.voice, |
|
scene=args.scene) |
|
) |
|
total = np.concatenate(pieces, 0) |
|
|
|
|
|
if len(x_native) > len(total): |
|
total = np.pad(total, (0, max(0, x_native.shape[0] - total.shape[0]))) |
|
|
|
else: |
|
x_native = np.pad(x_native, (0, max(0, total.shape[0] - x_native.shape[0]))) |
|
|
|
soundfile.write(AUDIO_TRACK, |
|
|
|
(.64 * total + .27 * x_native)[:, None], |
|
24000) |
|
else: |
|
OUT_FILE = 'tmp.mp4' |
|
x = tts_multi_sentence(text=text, |
|
precomputed_style_vector=precomputed_style_vector, |
|
voice=args.voice, |
|
scene=args.scene) |
|
soundfile.write(AUDIO_TRACK, x, 24000) |
|
|
|
|
|
|
|
if args.image is not None: |
|
|
|
STATIC_FRAME = args.image |
|
OUT_FILE = 'tmp.mp4' |
|
|
|
|
|
|
|
clip_silent = ImageClip(STATIC_FRAME).set_duration(5) |
|
clip_silent.write_videofile(SILENT_VIDEO, fps=24) |
|
|
|
x = tts_multi_sentence(text=text, |
|
precomputed_style_vector=precomputed_style_vector, |
|
voice=args.voice, |
|
scene=args.scene |
|
) |
|
soundfile.write(AUDIO_TRACK, x, 24000) |
|
if args.video or args.image: |
|
|
|
subprocess.call( |
|
["ffmpeg", |
|
"-y", |
|
"-i", |
|
SILENT_VIDEO, |
|
"-i", |
|
AUDIO_TRACK, |
|
"-c:v", |
|
"copy", |
|
"-map", |
|
"0:v:0", |
|
"-map", |
|
" 1:a:0", |
|
CACHE_DIR + OUT_FILE]) |
|
|
|
print(f'\noutput video is saved as {OUT_FILE}') |
|
|
|
else: |
|
|
|
|
|
x = tts_multi_sentence(text=text, |
|
precomputed_style_vector=precomputed_style_vector, |
|
voice=args.voice, |
|
scene=args.scene) |
|
OUT_FILE = 'tmp.wav' |
|
soundfile.write(CACHE_DIR + OUT_FILE, x, 24000) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f'\n=SERVER saved as {OUT_FILE=}\n') |
|
response = send_from_directory(CACHE_DIR, path=OUT_FILE) |
|
response.headers['suffix-file-type'] = OUT_FILE |
|
return response |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0") |
|
|