File size: 11,184 Bytes
9b31078
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
baf4ea1
9b31078
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
#!/usr/bin/env python3
# import csv
import typing
import wave
import subprocess
import tempfile
from pathlib import Path

import gradio as gr
# from typing import Tuple, Dict
# import pandas as pd
import socket
# import yaml
# from os.path import exists


# def read_config(file) -> Dict:
#     with open(file, 'r') as f:
#         return yaml.safe_load(f)


# def write_config(file, config_data):
#     with open(file, 'w') as yaml_file:
#         yaml.dump(config_data, yaml_file, default_flow_style=False)


# def load_data(filename: str) -> pd.DataFrame:
#     text_df = pd.read_csv(filename, header=None, names=['wav', 'text'], sep='|', on_bad_lines='skip')
#     print(text_df.head(4))
#     return text_df


# def validate_index(index: int) -> int:
#     max_index = Globals['max_index']
#     if index > max_index:
#         index = max_index
#     if index < 0:
#         index = 0
#     return index


def process_text(text: str) -> typing.List:
    text2 = text.lower().split('\n')
    # mucella_001|500|Nartıme zı pşıj-themate gore yaağ. A pşıjım zerécew ştığexer pşı Jećej. A pşıjım yıe yiĺ šıfxem lıyew arixırer afemış'ejew ḣuğe.|ady-lt
    index = 0
    text3 = []
    for line in text2:
        line = line.strip()
        if line == '':
            print('-D- Skipping an empty line')
            continue
        print(f'-D- Processing line: {line}')
        line = f'mucella_{index:04d}|500|{line}|ady-lt'
        index = index + 1
        text3.append(line)
    print(f'-D- process_text() text\n-D- before:\n{text}\n-D- after:\n{text3}')
    return text3


# def get_item_data(index: int) -> Tuple[int, str, str, str]:
#     index = validate_index(index)
#     df = Globals['text_df']
#     text = df.at[index, 'text']
#     if not isinstance(text, str):
#         text = ''
#     # Limit text to 'max_characters'
#     # if len(text) > Globals['max_characters']:
#     #     text = text[0:Globals['max_characters']]
#     audio_tag = df.at[index, 'wav']
#     audio_file = None
#     if isinstance(audio_tag, str):
#         audio_file = Globals['audio_dir'] + '/' + audio_tag + '.wav'
#     save_index(index)
#     return index, text, audio_tag, audio_file


# Globals = {
#     'input_csv': '',
#     'output_csv': '',
#     'audio_dir': '',
#     'session_config_file': '',
#     'text_df': pd.DataFrame(),
#     'max_index': 0
# }
# config_file = '/home/haroon/PycharmProjects/gradio_creating_web_apis/mucella_metadata.cfg'

# python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True

Globals = {
    'python_interpreter': '/home/haroon/python_virtual_envs/few_shot_tts/bin/python3',
    'code_repository_dir': '/home/haroon/git_repos/few-shot-transformer-tts',
    'model_dir': '/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella',
    'start_step': 2580000,
    'log_dir': '/tmp/few-shot-transformer-tts-server/log',
    'data_dir': '/tmp/few-shot-transformer-tts-server/text',
    'default_text': '  Maḣe keume sépĺı!  \n  \n Harun Şewgen  \n',
    'outfile': ''
}


# def incr_index(current_index: int) -> int:
#     current_index = current_index + 1
#     current_index = validate_index(current_index)
#     return current_index


# def decr_index(current_index: int) -> int:
#     current_index = current_index - 1
#     current_index = validate_index(current_index)
#     return current_index


def concat_wavs(processed_text: str) -> str:
    temp_wav_name = f'adiga_{next(tempfile._get_candidate_names())}'
    # outfile = f"{Globals['log_dir']}/output.wav"
    outfile = f"{Globals['log_dir']}/{temp_wav_name}.wav"
    print(f'-D- Concatenating to a single wav file: {outfile}')
    # /tmp/few-shot-transformer-tts-server/log/eval_2580000/mucella_0000.wav
    data = []
    for line in processed_text:
        print(f'-D- concat_wavs() line before split: {line}')
        wav_file, _, _, _ = line.split('|')
        wav_file = f"{Globals['log_dir']}/eval_{Globals['start_step']}/{wav_file}.wav"
        print(f'-D- wav_file: {wav_file}')
        w = wave.open(wav_file, 'rb')
        data.append([w.getparams(), w.readframes(w.getnframes())])
        w.close()
    output = wave.open(outfile, 'wb')
    output.setparams(data[0][0])
    for i in range(len(data)):
        output.writeframes(data[i][1])
    output.close()
    return outfile


def speak(text: str) -> str:
    Globals['outfile'] = ''
    print('-I- Generating speech ...')
    print(f'-D- speak() text: {text}')
    processed_text = process_text(text)
    # Save text to a temporary file
    text_file = f"{Globals['data_dir']}/text.txt"
    with open(text_file, 'w') as f:
        for line in processed_text:
            f.write(f'{line}\n')
        f.close()

    # Prepare speech synthesis command:
    # python eval.py --model-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella --log-dir=/home/haroon/git_repos/few-shot-transformer-tts/Models/Mucella/synthesize/kamzegur1 --data-dir=/home/haroon/git_repos/few-shot-transformer-tts/Samples/ --eval_meta=/home/haroon/git_repos/few-shot-transformer-tts/Samples/kamzegur1.txt --start_step=2580000 --no_wait=True
    cmd = f"{Globals['python_interpreter']} {Globals['code_repository_dir']}/eval.py --model-dir={Globals['model_dir']} --log-dir={Globals['log_dir']} --data-dir={Globals['data_dir']} --eval_meta={text_file} --start_step={Globals['start_step']} --no_wait=True".split()
    print(f'-D- Speech synthesis command:\n{cmd}')
    subprocess.run(cmd)
    print(f'-D- Finished synthesizing speech.')
    outfile = concat_wavs(processed_text)
    Globals['outfile'] = outfile
    return outfile, outfile


def download() -> str:
    outfile = Globals['outfile']
    if outfile != '' and Path(outfile).is_file():
        print(f'-I- Downloading {outfile}')
        return outfile
    return None


# def handle_text_editing(index: int, text: str) -> dict:
#     index = int(index)
#     index = validate_index(index)
#     new_text = text
#     if "\n" in new_text:
#         if index == Globals['max_index']:
#             print("-W- Can't split text since there are no audio tags left. Please add more audio tags.")
#             return gr.update(value=new_text)
#         [new_text1, new_text2] = new_text.split("\n", 1)
#         orig_text = Globals['text_df'].iat[index, 1]
#         orig_text2_index = orig_text.find(new_text2)
#         if orig_text2_index == -1:
#             # If text after the new line has been modified then just take the whole original text of this item to the
#             # next item (not easy way to figure out how to cut the original text).
#             new_text2 = orig_text
#         else:
#             new_text2 = orig_text[orig_text2_index:]
#         next_item_orig_text = Globals['text_df'].iat[index + 1, 1]
#         if not isinstance(next_item_orig_text, str):
#             next_item_orig_text = ''
#         new_text2 = new_text2 + ' ' + next_item_orig_text
#         save_text(index, new_text1)
#         save_text(index + 1, new_text2)
#         return gr.update(value=new_text1)
#     save_text(index, new_text)
#     return gr.update(value=new_text)


# def update_output_file(output_file: str):
#     Globals['output_csv'] = output_file


# def save_text(index: int, text: str):
#     index = int(index)
#     index = validate_index(index)
#     Globals['text_df'].iat[index, 1] = text
#     Globals['text_df'].to_csv(Globals['output_csv'], index=False, sep='|', header=False, quoting=csv.QUOTE_NONE)


# def save_index(index: int):
#     index = validate_index(index)
#     session_config_data = {'current_index': int(index)}
#     write_config(Globals['session_config_file'], session_config_data)


def main():
    global Globals
    # # Read main config file
    # config_data = {}
    # if exists(config_file):
    #     config_data = read_config(config_file)
    # if config_data is not None:
    #     Globals.update(config_data)
    # # Read config file which was written by an earlier session of the app
    # Globals['session_config_file'] = config_file + '.session'
    # session_config_data = None
    # if exists(Globals['session_config_file']):
    #     session_config_data = read_config(Globals['session_config_file'])
    # if session_config_data is not None:
    #     Globals.update(session_config_data)

    # Globals['output_csv'] = Globals['input_csv'] + '.new'
    # Globals['text_df'] = load_data(Globals['input_csv'])
    # Globals['max_index'] = Globals['text_df'].shape[0] - 1
    # default_index = 0
    # if 'current_index' in Globals:
    #     default_index = Globals['current_index']
    # default_index = validate_index(default_index)
    # _, default_text, default_audio_tag, default_audio_file = get_item_data(default_index)
    # print(f'-D- default_text: {default_text}, default_wav: {default_audio_file}')

    # Close port(s) in case it's still open from previous session
    gr.close_all()
    with gr.Blocks() as app:
        # output_file_elem = gr.Text(label='Output File', value=Globals['output_csv'], interactive=True, max_lines=1)
        # index_elem = gr.Number(label='Index', value=default_index)
        text_elem = gr.Text(show_label=False, value=Globals['default_text'], interactive=True, max_lines=5)
        # audio_tag_elem = gr.Text(label='Audio Tag', value=default_audio_tag, interactive=False)
        # audio_file_elem = gr.Audio(show_label=False, value='')
        speak_btn = gr.Button('Speak')
        # with gr.Row():
        #     speak_btn = gr.Button('Speak')
        #     download_btn = gr.Button("Download")
        # audio_file_elem = gr.Audio(show_label=False)
        # file_elem = gr.File(visible=True)
        # file_elem.change(fn=download, inputs=[download_btn], outputs=[])

        # index_elem.change(fn=get_item_data, inputs=[index_elem], outputs=[index_elem, text_elem, audio_tag_elem, audio_file_elem])
        # prev_btn.click(fn=decr_index, inputs=[index_elem], outputs=[index_elem])
        # next_btn.click(fn=incr_index, inputs=[index_elem], outputs=[index_elem])
        speak_btn.click(fn=speak, inputs=[text_elem], outputs=[gr.Audio(show_label=False), gr.File()])
        # download_btn.click(fn=download, inputs=[], outputs=[gr.File()])
        # text_elem.change(fn=handle_text_editing, inputs=[index_elem, text_elem], outputs=[text_elem])
        # output_file_elem.change(fn=update_output_file, inputs=[output_file_elem], outputs=[])

    hostname = (([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2]if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0]
    print(f'-D- Hostname: {hostname}')
    # app.launch(server_name=hostname, server_port=6012, share=False)
    app.launch(server_name=hostname, share=True)
    exit(0)


if __name__ == '__main__':
    main()