import json import os import time import azure.cognitiveservices.speech as speechsdk import datetime import zipfile import io import copy import re from urllib.request import urlopen from pathlib import Path from . import azure_batch from . import utils from .utils import parseBool # Get variables from config # Get Azure variables if applicable AZURE_SPEECH_KEY = os.environ.get('SPEECH_KEY') AZURE_SPEECH_REGION = os.environ.get('SPEECH_REGION') azure_sentence_pause = 80 azure_comma_pause = 50 debug_mode = False tts_service = 'azure' # ======================================== Pronunciation Correction Functions ================================================ BASE_DIR = Path(__file__).resolve().parent.parent / 'SSML_Customization' interpretAsOverrideFile = os.path.join(BASE_DIR, 'interpret-as.csv') interpretAsEntries = utils.csv_to_dict(interpretAsOverrideFile) aliasOverrideFile = os.path.join(BASE_DIR, 'aliases.csv') aliasEntries = utils.csv_to_dict(aliasOverrideFile) urlListFile = os.path.join(BASE_DIR, 'url_list.txt') urlList = utils.txt_to_list(urlListFile) phonemeFile = os.path.join(BASE_DIR, 'Phoneme_Pronunciation.csv') phonemeEntries = utils.csv_to_dict(phonemeFile) def add_all_pronunciation_overrides(text): text = add_interpretas_tags(text) text = add_alias_tags(text) text = add_phoneme_tags(text) return text def add_interpretas_tags(text): # Add interpret-as tags from interpret-as.csv for entryDict in interpretAsEntries: # Get entry info entryText = entryDict['Text'] entryInterpretAsType = entryDict['interpret-as Type'] isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)']) entryFormat = entryDict['Format (Optional)'] # Create say-as tag if entryFormat == "": sayAsTagStart = rf'' else: sayAsTagStart = rf'' # Find and replace the word findWordRegex = rf'(\b["\']?{entryText}[.,!?]?["\']?\b)' # Find the word, with optional punctuation after, and optional quotes before or after if isCaseSensitive: text = re.sub(findWordRegex, rf'{sayAsTagStart}\1', text) # Uses group reference, so remember regex must be in parentheses else: text = re.sub(findWordRegex, rf'{sayAsTagStart}\1', text, flags=re.IGNORECASE) # Add interpret-as tags from url_list.txt for url in urlList: # This regex expression will match the top level domain extension, and the punctuation before/after it, and any periods, slashes or colons # It will then put the say-as characters tag around all matches punctuationRegex = re.compile(r'((?:\.[a-z]{2,6}(?:\/|$|\s))|(?:[\.\/:]+))') taggedURL = re.sub(punctuationRegex, r'\1', url) # Replace any instances of the URL with the tagged version text = text.replace(url, taggedURL) return text def add_alias_tags(text): for entryDict in aliasEntries: # Get entry info entryText = entryDict['Original Text'] entryAlias = entryDict['Alias'] if entryDict['Case Sensitive (True/False)'] == "": isCaseSensitive = False else: isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)']) # Find and replace the word findWordRegex = rf'\b["\'()]?{entryText}[.,!?()]?["\']?\b' # Find the word, with optional punctuation after, and optional quotes before or after if isCaseSensitive: text = re.sub(findWordRegex, rf'{entryAlias}', text) else: text = re.sub(findWordRegex, rf'{entryAlias}', text, flags=re.IGNORECASE) return text # Uses the phoneme pronunciation file to add phoneme tags to the text def add_phoneme_tags(text): for entryDict in phonemeEntries: # Get entry info entryText = entryDict['Text'] entryPhoneme = entryDict['Phonetic Pronunciation'] entryAlphabet = entryDict['Phonetic Alphabet'] if entryDict['Case Sensitive (True/False)'] == "": isCaseSensitive = False else: isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)']) # Find and replace the word findWordRegex = rf'(\b["\'()]?{entryText}[.,!?()]?["\']?\b)' # Find the word, with optional punctuation after, and optional quotes before or after if isCaseSensitive: text = re.sub(findWordRegex, rf'\1', text) else: text = re.sub(findWordRegex, rf'\1', text, flags=re.IGNORECASE) return text # ================================================== Azure Functions ========================================================= def synthesize_text_azure(text, duration, voiceName, languageCode): # Create tag for desired duration of clip durationTag = f'' # Create string for sentence pauses, if not default if not azure_sentence_pause == 'default': sentencePauseTag = f'' else: sentencePauseTag = '' # Create string for comma pauses, if not default if not azure_comma_pause == 'default': commaPauseTag = f'' else: commaPauseTag = '' # Set string for tag to set leading and trailing silence times to zero leadSilenceTag = '' tailSilenceTag = '' # Process text using pronunciation customization set by user text = add_all_pronunciation_overrides(text) # Create SSML syntax for Azure TTS ssml = f"" \ f"{sentencePauseTag}{commaPauseTag}{durationTag}{leadSilenceTag}{tailSilenceTag}" \ f"{text}" speech_config = speechsdk.SpeechConfig(subscription=AZURE_SPEECH_KEY, region=AZURE_SPEECH_REGION) # For Azure voices, see: https://learn.microsoft.com/en-us/azure/cognitive-services/speech-service/language-support?tabs=stt-tts speech_config.speech_synthesis_voice_name=voiceName # For audio outputs, see: https://learn.microsoft.com/en-us/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechsynthesisoutputformat?view=azure-python speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Audio48Khz192KBitRateMonoMp3) synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None) #result = synthesizer.speak_text_async(text).get() result = synthesizer.speak_ssml_async(ssml).get() stream = speechsdk.AudioDataStream(result) return stream def format_percentage_change(speedFactor): # Determine speedFactor value for Azure TTS. It should be either 'default' or a relative change. if speedFactor == 1.0: rate = 'default' else: # Whether to add a plus sign to the number to relative change. A negative will automatically be added if speedFactor >= 1.0: percentSign = '+' else: percentSign = '' # Convert speedFactor float value to a relative percentage rate = percentSign + str(round((speedFactor - 1.0) * 100, 5)) + '%' return rate def synthesize_text_azure_batch(subsDict, langDict, skipSynthesize=False, secondPass=False): def create_request_payload(remainingEntriesDict): # Create SSML for all subtitles ssmlJson = [] payloadSizeInBytes = 0 tempDict = dict(remainingEntriesDict) # Need to do this to avoid changing the original dict which would mess with the loop for key, value in tempDict.items(): text = tempDict[key]['translated_text'] duration = tempDict[key]['duration_ms_buffered'] language = langDict['languageCode'] voice = langDict['voiceName'] # Create tag for desired duration of clip durationTag = f'' # Create string for sentence pauses, if not default if not azure_sentence_pause == 'default': sentencePauseTag = f'' else: sentencePauseTag = '' # Create string for comma pauses, if not default if not azure_comma_pause == 'default': commaPauseTag = f'' else: commaPauseTag = '' # Set string for tag to set leading and trailing silence times to zero leadSilenceTag = '' tailSilenceTag = '' # Process text using pronunciation customization set by user text = add_all_pronunciation_overrides(text) # Create the SSML for each subtitle ssml = f"" \ f"{sentencePauseTag}{commaPauseTag}{durationTag}{leadSilenceTag}{tailSilenceTag}" \ f"{text}" ssmlJson.append({"text": ssml}) # Construct request payload with SSML # Reconstruct payload with every loop with new SSML so that the payload size is accurate now = datetime.datetime.now() pendingPayload = { 'displayName': langDict['languageCode'] + '-' + now.strftime("%Y-%m-%d %H:%M:%S"), 'description': 'Batch synthesis of ' + langDict['languageCode'] + ' subtitles', "textType": "SSML", # To use custom voice, see original example code script linked from azure_batch.py "inputs": ssmlJson, "properties": { "outputFormat": "audio-48khz-192kbitrate-mono-mp3", "wordBoundaryEnabled": False, "sentenceBoundaryEnabled": False, "concatenateResult": False, "decompressOutputFiles": False }, } # Azure TTS Batch requests require payload must be under 500 kilobytes, so check payload is under 500,000 bytes. Not sure if they actually mean kibibytes, assume worst case. # Payload will be formatted as json so must account for that too by doing json.dumps(), otherwise calculated size will be inaccurate payloadSizeInBytes = len(str(json.dumps(pendingPayload)).encode('utf-8')) if payloadSizeInBytes > 495000 or len(ssmlJson) > 995: # Leave some room for anything unexpected. Also number of inputs must be below 1000 # If payload would be too large, ignore the last entry and break out of loop return payload, remainingEntriesDict else: payload = copy.deepcopy(pendingPayload) # Must make deepycopy otherwise ssmlJson will be updated in both instead of just pendingPayload # Remove entry from remainingEntriesDict if it was added to payload remainingEntriesDict.pop(key) # If all the rest of the entries fit, return the payload return payload, remainingEntriesDict # ------------------------- End create_request_payload() ----------------------------------- # Create payloads, split into multiple if necessary payloadList = [] remainingPayloadEntriesDict = dict(subsDict) # Will remove entries as they are added to payloads while len(remainingPayloadEntriesDict) > 0: payloadToAppend, remainingPayloadEntriesDict = create_request_payload(remainingPayloadEntriesDict) payloadList.append(payloadToAppend) # Tell user if request will be broken up into multiple payloads if len(payloadList) > 1: print(f'Payload will be broken up into {len(payloadList)} requests (due to Azure size limitations).') # Use to keep track of filenames downloaded via separate zip files. WIll remove as they are downloaded remainingDownloadedEntriesList = list(subsDict.keys()) # Clear out workingFolder for filename in os.listdir('workingFolder'): if not debug_mode: os.remove(os.path.join('workingFolder', filename)) # Loop through payloads and submit to Azure for payload in payloadList: # Reset job_id from previous loops job_id = None # Send request to Azure job_id = azure_batch.submit_synthesis(payload) # Wait for job to finish if job_id is not None: status = "Running" resultDownloadLink = None while True: # Must use break to exit loop # Get status response = azure_batch.get_synthesis(job_id) status = response.json()['status'] if status == 'Succeeded': print('Batch synthesis job succeeded') resultDownloadLink = azure_batch.get_synthesis(job_id).json()['outputs']['result'] break elif status == 'Failed': print('ERROR: Batch synthesis job failed!') print("Reason:" + response.reason) break else: print(f'Waiting for Azure batch synthesis job to finish. Status: [{status}]') time.sleep(5) # Download resultig zip file if resultDownloadLink is not None: # Download zip file urlResponse = urlopen(resultDownloadLink) # If debug mode, save zip file to disk if debug_mode: if secondPass == False: zipName = 'azureBatch.zip' else: zipName = 'azureBatchPass2.zip' zipPath = os.path.join('workingFolder', zipName) with open(zipPath, 'wb') as f: f.write(urlResponse.read()) # Reset urlResponse so it can be read again urlResponse = urlopen(resultDownloadLink) # Process zip file virtualResultZip = io.BytesIO(urlResponse.read()) zipdata = zipfile.ZipFile(virtualResultZip) zipinfos = zipdata.infolist() # Reorder zipinfos so the file names are in alphanumeric order zipinfos.sort(key=lambda x: x.filename) # Only extract necessary files, and rename them while doing so for file in zipinfos: if file.filename == "summary.json": #zipdata.extract(file, 'workingFolder') # For debugging pass elif "json" not in file.filename: # Rename file to match first entry in remainingDownloadedEntriesDict, then extract currentFileNum = remainingDownloadedEntriesList[0] file.filename = str(currentFileNum) + '.mp3' #file.filename = file.filename.lstrip('0') # Add file path to subsDict then remove from remainingDownloadedEntriesList subsDict[currentFileNum]['TTS_FilePath'] = os.path.join('workingFolder', str(currentFileNum)) + '.mp3' # Extract file zipdata.extract(file, 'workingFolder') # Remove entry from remainingDownloadedEntriesList remainingDownloadedEntriesList.pop(0) return subsDict def synthesize_dictionary_batch(subsDict, langDict, skipSynthesize=False, secondPass=False): if not skipSynthesize: subsDict = synthesize_text_azure_batch(subsDict, langDict, skipSynthesize, secondPass) return subsDict def synthesize_dictionary(subsDict, langDict, outputFolder, skipSynthesize=False, secondPass=False): for key, value in subsDict.items(): # TTS each subtitle text, write to file, write filename into dictionary workingFolder = os.path.join(outputFolder, 'workingFolder') filePath = os.path.join(workingFolder, f'{str(key)}.mp3') filePathStem = os.path.join(workingFolder, f'{str(key)}') if not skipSynthesize: duration = value['duration_ms_buffered'] if secondPass: # Get speed factor from subsDict speedFactor = subsDict[key]['speed_factor'] else: speedFactor = float(1.0) # Prepare output location. If folder doesn't exist, create it if not os.path.exists(os.path.dirname(filePath)): try: os.makedirs(os.path.dirname(filePath)) except OSError: print("Error creating directory") # If Azure TTS, use Azure API if tts_service == "azure": # Audio variable is an AudioDataStream object audio = synthesize_text_azure(value['translated_text'], duration, langDict['voiceName'], langDict['languageCode']) # Save to file using save_to_wav_file method of audio object audio.save_to_wav_file(filePath) # If debug mode, write to files after Google TTS if debug_mode and secondPass == False: audio.save_to_wav_file(filePathStem+"_p1.mp3") elif debug_mode and secondPass == True: audio.save_to_wav_file(filePathStem+"_p2.mp3") subsDict[key]['TTS_FilePath'] = filePath # Get key index keyIndex = list(subsDict.keys()).index(key) # Print progress and overwrite line next time if not secondPass: print(f" Synthesizing TTS Line: {keyIndex+1} of {len(subsDict)}", end="\r") else: print(f" Synthesizing TTS Line (2nd Pass): {keyIndex+1} of {len(subsDict)}", end="\r") print(" ") # Clear the line return subsDict