#!/usr/bin/env python3 # import csv import typing import wave import subprocess import tempfile from pathlib import Path import gradio as gr # from typing import Tuple, Dict # import pandas as pd import socket # import yaml # from os.path import exists # def read_config(file) -> Dict: # with open(file, 'r') as f: # return yaml.safe_load(f) # def write_config(file, config_data): # with open(file, 'w') as yaml_file: # yaml.dump(config_data, yaml_file, default_flow_style=False) # def load_data(filename: str) -> pd.DataFrame: # text_df = pd.read_csv(filename, header=None, names=['wav', 'text'], sep='|', on_bad_lines='skip') # print(text_df.head(4)) # return text_df # def validate_index(index: int) -> int: # max_index = Globals['max_index'] # if index > max_index: # index = max_index # if index < 0: # index = 0 # return index def process_text(text: str) -> typing.List: text2 = text.lower().split('\n') # mucella_001|500|Nartıme zı pşıj-themate gore yaağ. A pşıjım zerécew ştığexer pşı Jećej. A pşıjım yıe yiĺ šıfxem lıyew arixırer afemış'ejew ḣuğe.|ady-lt index = 0 text3 = [] for line in text2: line = line.strip() if line == '': print('-D- Skipping an empty line') continue print(f'-D- Processing line: {line}') line = f'mucella_{index:04d}|500|{line}|ady-lt' index = index + 1 text3.append(line) print(f'-D- process_text() text\n-D- before:\n{text}\n-D- after:\n{text3}') return text3 # def get_item_data(index: int) -> Tuple[int, str, str, str]: # index = validate_index(index) # df = Globals['text_df'] # text = df.at[index, 'text'] # if not isinstance(text, str): # text = '' # # Limit text to 'max_characters' # # if len(text) > Globals['max_characters']: # # text = text[0:Globals['max_characters']] # audio_tag = df.at[index, 'wav'] # audio_file = None # if isinstance(audio_tag, str): # audio_file = Globals['audio_dir'] + '/' + audio_tag + '.wav' # save_index(index) # return index, text, audio_tag, audio_file # Globals = { # 'input_csv': '', # 'output_csv': '', # 'audio_dir': '', # 'session_config_file': '', # 'text_df': pd.DataFrame(), # 'max_index': 0 # } # config_file = '/home/haroon/PycharmProjects/gradio_creating_web_apis/mucella_metadata.cfg' # python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True Globals = { 'python_interpreter': '/home/haroon/python_virtual_envs/few_shot_tts/bin/python3', 'code_repository_dir': '/home/haroon/git_repos/few-shot-transformer-tts', 'model_dir': '/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella', 'start_step': 2580000, 'log_dir': '/tmp/few-shot-transformer-tts-server/log', 'data_dir': '/tmp/few-shot-transformer-tts-server/text', 'default_text': ' Maḣe keume sépĺı! \n \n Harun Şewgen \n', 'outfile': '' } # def incr_index(current_index: int) -> int: # current_index = current_index + 1 # current_index = validate_index(current_index) # return current_index # def decr_index(current_index: int) -> int: # current_index = current_index - 1 # current_index = validate_index(current_index) # return current_index def concat_wavs(processed_text: str) -> str: temp_wav_name = f'adiga_{next(tempfile._get_candidate_names())}' # outfile = f"{Globals['log_dir']}/output.wav" outfile = f"{Globals['log_dir']}/{temp_wav_name}.wav" print(f'-D- Concatenating to a single wav file: {outfile}') # /tmp/few-shot-transformer-tts-server/log/eval_2580000/mucella_0000.wav data = [] for line in processed_text: print(f'-D- concat_wavs() line before split: {line}') wav_file, _, _, _ = line.split('|') wav_file = f"{Globals['log_dir']}/eval_{Globals['start_step']}/{wav_file}.wav" print(f'-D- wav_file: {wav_file}') w = wave.open(wav_file, 'rb') data.append([w.getparams(), w.readframes(w.getnframes())]) w.close() output = wave.open(outfile, 'wb') output.setparams(data[0][0]) for i in range(len(data)): output.writeframes(data[i][1]) output.close() return outfile def speak(text: str) -> str: Globals['outfile'] = '' print('-I- Generating speech ...') print(f'-D- speak() text: {text}') processed_text = process_text(text) # Save text to a temporary file text_file = f"{Globals['data_dir']}/text.txt" with open(text_file, 'w') as f: for line in processed_text: f.write(f'{line}\n') f.close() # Prepare speech synthesis command: # python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True cmd = f"{Globals['python_interpreter']} {Globals['code_repository_dir']}/eval.py --model-dir={Globals['model_dir']} --log-dir={Globals['log_dir']} --data-dir={Globals['data_dir']} --eval_meta={text_file} --start_step={Globals['start_step']} --no_wait=True".split() print(f'-D- Speech synthesis command:\n{cmd}') subprocess.run(cmd) print(f'-D- Finished synthesizing speech.') outfile = concat_wavs(processed_text) Globals['outfile'] = outfile return outfile, outfile def download() -> str: outfile = Globals['outfile'] if outfile != '' and Path(outfile).is_file(): print(f'-I- Downloading {outfile}') return outfile return None # def handle_text_editing(index: int, text: str) -> dict: # index = int(index) # index = validate_index(index) # new_text = text # if "\n" in new_text: # if index == Globals['max_index']: # print("-W- Can't split text since there are no audio tags left. Please add more audio tags.") # return gr.update(value=new_text) # [new_text1, new_text2] = new_text.split("\n", 1) # orig_text = Globals['text_df'].iat[index, 1] # orig_text2_index = orig_text.find(new_text2) # if orig_text2_index == -1: # # If text after the new line has been modified then just take the whole original text of this item to the # # next item (not easy way to figure out how to cut the original text). # new_text2 = orig_text # else: # new_text2 = orig_text[orig_text2_index:] # next_item_orig_text = Globals['text_df'].iat[index + 1, 1] # if not isinstance(next_item_orig_text, str): # next_item_orig_text = '' # new_text2 = new_text2 + ' ' + next_item_orig_text # save_text(index, new_text1) # save_text(index + 1, new_text2) # return gr.update(value=new_text1) # save_text(index, new_text) # return gr.update(value=new_text) # def update_output_file(output_file: str): # Globals['output_csv'] = output_file # def save_text(index: int, text: str): # index = int(index) # index = validate_index(index) # Globals['text_df'].iat[index, 1] = text # Globals['text_df'].to_csv(Globals['output_csv'], index=False, sep='|', header=False, quoting=csv.QUOTE_NONE) # def save_index(index: int): # index = validate_index(index) # session_config_data = {'current_index': int(index)} # write_config(Globals['session_config_file'], session_config_data) def main(): global Globals # # Read main config file # config_data = {} # if exists(config_file): # config_data = read_config(config_file) # if config_data is not None: # Globals.update(config_data) # # Read config file which was written by an earlier session of the app # Globals['session_config_file'] = config_file + '.session' # session_config_data = None # if exists(Globals['session_config_file']): # session_config_data = read_config(Globals['session_config_file']) # if session_config_data is not None: # Globals.update(session_config_data) # Globals['output_csv'] = Globals['input_csv'] + '.new' # Globals['text_df'] = load_data(Globals['input_csv']) # Globals['max_index'] = Globals['text_df'].shape[0] - 1 # default_index = 0 # if 'current_index' in Globals: # default_index = Globals['current_index'] # default_index = validate_index(default_index) # _, default_text, default_audio_tag, default_audio_file = get_item_data(default_index) # print(f'-D- default_text: {default_text}, default_wav: {default_audio_file}') # Close port(s) in case it's still open from previous session gr.close_all() with gr.Blocks() as app: # output_file_elem = gr.Text(label='Output File', value=Globals['output_csv'], interactive=True, max_lines=1) # index_elem = gr.Number(label='Index', value=default_index) text_elem = gr.Text(show_label=False, value=Globals['default_text'], interactive=True, max_lines=5) # audio_tag_elem = gr.Text(label='Audio Tag', value=default_audio_tag, interactive=False) # audio_file_elem = gr.Audio(show_label=False, value='') speak_btn = gr.Button('Speak') # with gr.Row(): # speak_btn = gr.Button('Speak') # download_btn = gr.Button("Download") # audio_file_elem = gr.Audio(show_label=False) # file_elem = gr.File(visible=True) # file_elem.change(fn=download, inputs=[download_btn], outputs=[]) # index_elem.change(fn=get_item_data, inputs=[index_elem], outputs=[index_elem, text_elem, audio_tag_elem, audio_file_elem]) # prev_btn.click(fn=decr_index, inputs=[index_elem], outputs=[index_elem]) # next_btn.click(fn=incr_index, inputs=[index_elem], outputs=[index_elem]) speak_btn.click(fn=speak, inputs=[text_elem], outputs=[gr.Audio(show_label=False), gr.File()]) # download_btn.click(fn=download, inputs=[], outputs=[gr.File()]) # text_elem.change(fn=handle_text_editing, inputs=[index_elem, text_elem], outputs=[text_elem]) # output_file_elem.change(fn=update_output_file, inputs=[output_file_elem], outputs=[]) hostname = (([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2]if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0] print(f'-D- Hostname: {hostname}') # app.launch(server_name=hostname, server_port=6012, share=False) app.launch(server_name=hostname, share=True) exit(0) if __name__ == '__main__': main()