import json
import os
import time
import azure.cognitiveservices.speech as speechsdk
import datetime
import zipfile
import io
import copy
import re
from urllib.request import urlopen
from pathlib import Path
from . import azure_batch
from . import utils
from .utils import parseBool
# Get variables from config
# Get Azure variables if applicable
AZURE_SPEECH_KEY = os.environ.get('SPEECH_KEY')
AZURE_SPEECH_REGION = os.environ.get('SPEECH_REGION')
azure_sentence_pause = 80
azure_comma_pause = 50
debug_mode = False
tts_service = 'azure'
# ======================================== Pronunciation Correction Functions ================================================
BASE_DIR = Path(__file__).resolve().parent.parent / 'SSML_Customization'
interpretAsOverrideFile = os.path.join(BASE_DIR, 'interpret-as.csv')
interpretAsEntries = utils.csv_to_dict(interpretAsOverrideFile)
aliasOverrideFile = os.path.join(BASE_DIR, 'aliases.csv')
aliasEntries = utils.csv_to_dict(aliasOverrideFile)
urlListFile = os.path.join(BASE_DIR, 'url_list.txt')
urlList = utils.txt_to_list(urlListFile)
phonemeFile = os.path.join(BASE_DIR, 'Phoneme_Pronunciation.csv')
phonemeEntries = utils.csv_to_dict(phonemeFile)
def add_all_pronunciation_overrides(text):
text = add_interpretas_tags(text)
text = add_alias_tags(text)
text = add_phoneme_tags(text)
return text
def add_interpretas_tags(text):
# Add interpret-as tags from interpret-as.csv
for entryDict in interpretAsEntries:
# Get entry info
entryText = entryDict['Text']
entryInterpretAsType = entryDict['interpret-as Type']
isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)'])
entryFormat = entryDict['Format (Optional)']
# Create say-as tag
if entryFormat == "":
sayAsTagStart = rf''
else:
sayAsTagStart = rf''
# Find and replace the word
findWordRegex = rf'(\b["\']?{entryText}[.,!?]?["\']?\b)' # Find the word, with optional punctuation after, and optional quotes before or after
if isCaseSensitive:
text = re.sub(findWordRegex, rf'{sayAsTagStart}\1', text) # Uses group reference, so remember regex must be in parentheses
else:
text = re.sub(findWordRegex, rf'{sayAsTagStart}\1', text, flags=re.IGNORECASE)
# Add interpret-as tags from url_list.txt
for url in urlList:
# This regex expression will match the top level domain extension, and the punctuation before/after it, and any periods, slashes or colons
# It will then put the say-as characters tag around all matches
punctuationRegex = re.compile(r'((?:\.[a-z]{2,6}(?:\/|$|\s))|(?:[\.\/:]+))')
taggedURL = re.sub(punctuationRegex, r'\1', url)
# Replace any instances of the URL with the tagged version
text = text.replace(url, taggedURL)
return text
def add_alias_tags(text):
for entryDict in aliasEntries:
# Get entry info
entryText = entryDict['Original Text']
entryAlias = entryDict['Alias']
if entryDict['Case Sensitive (True/False)'] == "":
isCaseSensitive = False
else:
isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)'])
# Find and replace the word
findWordRegex = rf'\b["\'()]?{entryText}[.,!?()]?["\']?\b' # Find the word, with optional punctuation after, and optional quotes before or after
if isCaseSensitive:
text = re.sub(findWordRegex, rf'{entryAlias}', text)
else:
text = re.sub(findWordRegex, rf'{entryAlias}', text, flags=re.IGNORECASE)
return text
# Uses the phoneme pronunciation file to add phoneme tags to the text
def add_phoneme_tags(text):
for entryDict in phonemeEntries:
# Get entry info
entryText = entryDict['Text']
entryPhoneme = entryDict['Phonetic Pronunciation']
entryAlphabet = entryDict['Phonetic Alphabet']
if entryDict['Case Sensitive (True/False)'] == "":
isCaseSensitive = False
else:
isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)'])
# Find and replace the word
findWordRegex = rf'(\b["\'()]?{entryText}[.,!?()]?["\']?\b)' # Find the word, with optional punctuation after, and optional quotes before or after
if isCaseSensitive:
text = re.sub(findWordRegex, rf'\1', text)
else:
text = re.sub(findWordRegex, rf'\1', text, flags=re.IGNORECASE)
return text
# ================================================== Azure Functions =========================================================
def synthesize_text_azure(text, duration, voiceName, languageCode):
# Create tag for desired duration of clip
durationTag = f''
# Create string for sentence pauses, if not default
if not azure_sentence_pause == 'default':
sentencePauseTag = f''
else:
sentencePauseTag = ''
# Create string for comma pauses, if not default
if not azure_comma_pause == 'default':
commaPauseTag = f''
else:
commaPauseTag = ''
# Set string for tag to set leading and trailing silence times to zero
leadSilenceTag = ''
tailSilenceTag = ''
# Process text using pronunciation customization set by user
text = add_all_pronunciation_overrides(text)
# Create SSML syntax for Azure TTS
ssml = f"" \
f"{sentencePauseTag}{commaPauseTag}{durationTag}{leadSilenceTag}{tailSilenceTag}" \
f"{text}"
speech_config = speechsdk.SpeechConfig(subscription=AZURE_SPEECH_KEY, region=AZURE_SPEECH_REGION)
# For Azure voices, see: https://learn.microsoft.com/en-us/azure/cognitive-services/speech-service/language-support?tabs=stt-tts
speech_config.speech_synthesis_voice_name=voiceName
# For audio outputs, see: https://learn.microsoft.com/en-us/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechsynthesisoutputformat?view=azure-python
speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Audio48Khz192KBitRateMonoMp3)
synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None)
#result = synthesizer.speak_text_async(text).get()
result = synthesizer.speak_ssml_async(ssml).get()
stream = speechsdk.AudioDataStream(result)
return stream
def format_percentage_change(speedFactor):
# Determine speedFactor value for Azure TTS. It should be either 'default' or a relative change.
if speedFactor == 1.0:
rate = 'default'
else:
# Whether to add a plus sign to the number to relative change. A negative will automatically be added
if speedFactor >= 1.0:
percentSign = '+'
else:
percentSign = ''
# Convert speedFactor float value to a relative percentage
rate = percentSign + str(round((speedFactor - 1.0) * 100, 5)) + '%'
return rate
def synthesize_text_azure_batch(subsDict, langDict, skipSynthesize=False, secondPass=False):
def create_request_payload(remainingEntriesDict):
# Create SSML for all subtitles
ssmlJson = []
payloadSizeInBytes = 0
tempDict = dict(remainingEntriesDict) # Need to do this to avoid changing the original dict which would mess with the loop
for key, value in tempDict.items():
text = tempDict[key]['translated_text']
duration = tempDict[key]['duration_ms_buffered']
language = langDict['languageCode']
voice = langDict['voiceName']
# Create tag for desired duration of clip
durationTag = f''
# Create string for sentence pauses, if not default
if not azure_sentence_pause == 'default':
sentencePauseTag = f''
else:
sentencePauseTag = ''
# Create string for comma pauses, if not default
if not azure_comma_pause == 'default':
commaPauseTag = f''
else:
commaPauseTag = ''
# Set string for tag to set leading and trailing silence times to zero
leadSilenceTag = ''
tailSilenceTag = ''
# Process text using pronunciation customization set by user
text = add_all_pronunciation_overrides(text)
# Create the SSML for each subtitle
ssml = f"" \
f"{sentencePauseTag}{commaPauseTag}{durationTag}{leadSilenceTag}{tailSilenceTag}" \
f"{text}"
ssmlJson.append({"text": ssml})
# Construct request payload with SSML
# Reconstruct payload with every loop with new SSML so that the payload size is accurate
now = datetime.datetime.now()
pendingPayload = {
'displayName': langDict['languageCode'] + '-' + now.strftime("%Y-%m-%d %H:%M:%S"),
'description': 'Batch synthesis of ' + langDict['languageCode'] + ' subtitles',
"textType": "SSML",
# To use custom voice, see original example code script linked from azure_batch.py
"inputs": ssmlJson,
"properties": {
"outputFormat": "audio-48khz-192kbitrate-mono-mp3",
"wordBoundaryEnabled": False,
"sentenceBoundaryEnabled": False,
"concatenateResult": False,
"decompressOutputFiles": False
},
}
# Azure TTS Batch requests require payload must be under 500 kilobytes, so check payload is under 500,000 bytes. Not sure if they actually mean kibibytes, assume worst case.
# Payload will be formatted as json so must account for that too by doing json.dumps(), otherwise calculated size will be inaccurate
payloadSizeInBytes = len(str(json.dumps(pendingPayload)).encode('utf-8'))
if payloadSizeInBytes > 495000 or len(ssmlJson) > 995: # Leave some room for anything unexpected. Also number of inputs must be below 1000
# If payload would be too large, ignore the last entry and break out of loop
return payload, remainingEntriesDict
else:
payload = copy.deepcopy(pendingPayload) # Must make deepycopy otherwise ssmlJson will be updated in both instead of just pendingPayload
# Remove entry from remainingEntriesDict if it was added to payload
remainingEntriesDict.pop(key)
# If all the rest of the entries fit, return the payload
return payload, remainingEntriesDict
# ------------------------- End create_request_payload() -----------------------------------
# Create payloads, split into multiple if necessary
payloadList = []
remainingPayloadEntriesDict = dict(subsDict) # Will remove entries as they are added to payloads
while len(remainingPayloadEntriesDict) > 0:
payloadToAppend, remainingPayloadEntriesDict = create_request_payload(remainingPayloadEntriesDict)
payloadList.append(payloadToAppend)
# Tell user if request will be broken up into multiple payloads
if len(payloadList) > 1:
print(f'Payload will be broken up into {len(payloadList)} requests (due to Azure size limitations).')
# Use to keep track of filenames downloaded via separate zip files. WIll remove as they are downloaded
remainingDownloadedEntriesList = list(subsDict.keys())
# Clear out workingFolder
for filename in os.listdir('workingFolder'):
if not debug_mode:
os.remove(os.path.join('workingFolder', filename))
# Loop through payloads and submit to Azure
for payload in payloadList:
# Reset job_id from previous loops
job_id = None
# Send request to Azure
job_id = azure_batch.submit_synthesis(payload)
# Wait for job to finish
if job_id is not None:
status = "Running"
resultDownloadLink = None
while True: # Must use break to exit loop
# Get status
response = azure_batch.get_synthesis(job_id)
status = response.json()['status']
if status == 'Succeeded':
print('Batch synthesis job succeeded')
resultDownloadLink = azure_batch.get_synthesis(job_id).json()['outputs']['result']
break
elif status == 'Failed':
print('ERROR: Batch synthesis job failed!')
print("Reason:" + response.reason)
break
else:
print(f'Waiting for Azure batch synthesis job to finish. Status: [{status}]')
time.sleep(5)
# Download resultig zip file
if resultDownloadLink is not None:
# Download zip file
urlResponse = urlopen(resultDownloadLink)
# If debug mode, save zip file to disk
if debug_mode:
if secondPass == False:
zipName = 'azureBatch.zip'
else:
zipName = 'azureBatchPass2.zip'
zipPath = os.path.join('workingFolder', zipName)
with open(zipPath, 'wb') as f:
f.write(urlResponse.read())
# Reset urlResponse so it can be read again
urlResponse = urlopen(resultDownloadLink)
# Process zip file
virtualResultZip = io.BytesIO(urlResponse.read())
zipdata = zipfile.ZipFile(virtualResultZip)
zipinfos = zipdata.infolist()
# Reorder zipinfos so the file names are in alphanumeric order
zipinfos.sort(key=lambda x: x.filename)
# Only extract necessary files, and rename them while doing so
for file in zipinfos:
if file.filename == "summary.json":
#zipdata.extract(file, 'workingFolder') # For debugging
pass
elif "json" not in file.filename:
# Rename file to match first entry in remainingDownloadedEntriesDict, then extract
currentFileNum = remainingDownloadedEntriesList[0]
file.filename = str(currentFileNum) + '.mp3'
#file.filename = file.filename.lstrip('0')
# Add file path to subsDict then remove from remainingDownloadedEntriesList
subsDict[currentFileNum]['TTS_FilePath'] = os.path.join('workingFolder', str(currentFileNum)) + '.mp3'
# Extract file
zipdata.extract(file, 'workingFolder')
# Remove entry from remainingDownloadedEntriesList
remainingDownloadedEntriesList.pop(0)
return subsDict
def synthesize_dictionary_batch(subsDict, langDict, skipSynthesize=False, secondPass=False):
if not skipSynthesize:
subsDict = synthesize_text_azure_batch(subsDict, langDict, skipSynthesize, secondPass)
return subsDict
def synthesize_dictionary(subsDict, langDict, outputFolder, skipSynthesize=False, secondPass=False):
for key, value in subsDict.items():
# TTS each subtitle text, write to file, write filename into dictionary
workingFolder = os.path.join(outputFolder, 'workingFolder')
filePath = os.path.join(workingFolder, f'{str(key)}.mp3')
filePathStem = os.path.join(workingFolder, f'{str(key)}')
if not skipSynthesize:
duration = value['duration_ms_buffered']
if secondPass:
# Get speed factor from subsDict
speedFactor = subsDict[key]['speed_factor']
else:
speedFactor = float(1.0)
# Prepare output location. If folder doesn't exist, create it
if not os.path.exists(os.path.dirname(filePath)):
try:
os.makedirs(os.path.dirname(filePath))
except OSError:
print("Error creating directory")
# If Azure TTS, use Azure API
if tts_service == "azure":
# Audio variable is an AudioDataStream object
audio = synthesize_text_azure(value['translated_text'], duration, langDict['voiceName'], langDict['languageCode'])
# Save to file using save_to_wav_file method of audio object
audio.save_to_wav_file(filePath)
# If debug mode, write to files after Google TTS
if debug_mode and secondPass == False:
audio.save_to_wav_file(filePathStem+"_p1.mp3")
elif debug_mode and secondPass == True:
audio.save_to_wav_file(filePathStem+"_p2.mp3")
subsDict[key]['TTS_FilePath'] = filePath
# Get key index
keyIndex = list(subsDict.keys()).index(key)
# Print progress and overwrite line next time
if not secondPass:
print(f" Synthesizing TTS Line: {keyIndex+1} of {len(subsDict)}", end="\r")
else:
print(f" Synthesizing TTS Line (2nd Pass): {keyIndex+1} of {len(subsDict)}", end="\r")
print(" ") # Clear the line
return subsDict