badal
feat: initial commit
2f2406a
import json
import os
import time
import azure.cognitiveservices.speech as speechsdk
import datetime
import zipfile
import io
import copy
import re
from urllib.request import urlopen
from pathlib import Path
from . import azure_batch
from . import utils
from .utils import parseBool
# Get variables from config
# Get Azure variables if applicable
AZURE_SPEECH_KEY = os.environ.get('SPEECH_KEY')
AZURE_SPEECH_REGION = os.environ.get('SPEECH_REGION')
azure_sentence_pause = 80
azure_comma_pause = 50
debug_mode = False
tts_service = 'azure'
# ======================================== Pronunciation Correction Functions ================================================
BASE_DIR = Path(__file__).resolve().parent.parent / 'SSML_Customization'
interpretAsOverrideFile = os.path.join(BASE_DIR, 'interpret-as.csv')
interpretAsEntries = utils.csv_to_dict(interpretAsOverrideFile)
aliasOverrideFile = os.path.join(BASE_DIR, 'aliases.csv')
aliasEntries = utils.csv_to_dict(aliasOverrideFile)
urlListFile = os.path.join(BASE_DIR, 'url_list.txt')
urlList = utils.txt_to_list(urlListFile)
phonemeFile = os.path.join(BASE_DIR, 'Phoneme_Pronunciation.csv')
phonemeEntries = utils.csv_to_dict(phonemeFile)
def add_all_pronunciation_overrides(text):
text = add_interpretas_tags(text)
text = add_alias_tags(text)
text = add_phoneme_tags(text)
return text
def add_interpretas_tags(text):
# Add interpret-as tags from interpret-as.csv
for entryDict in interpretAsEntries:
# Get entry info
entryText = entryDict['Text']
entryInterpretAsType = entryDict['interpret-as Type']
isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)'])
entryFormat = entryDict['Format (Optional)']
# Create say-as tag
if entryFormat == "":
sayAsTagStart = rf'<say-as interpret-as="{entryInterpretAsType}">'
else:
sayAsTagStart = rf'<say-as interpret-as="{entryInterpretAsType}" format="{entryFormat}">'
# Find and replace the word
findWordRegex = rf'(\b["\']?{entryText}[.,!?]?["\']?\b)' # Find the word, with optional punctuation after, and optional quotes before or after
if isCaseSensitive:
text = re.sub(findWordRegex, rf'{sayAsTagStart}\1</say-as>', text) # Uses group reference, so remember regex must be in parentheses
else:
text = re.sub(findWordRegex, rf'{sayAsTagStart}\1</say-as>', text, flags=re.IGNORECASE)
# Add interpret-as tags from url_list.txt
for url in urlList:
# This regex expression will match the top level domain extension, and the punctuation before/after it, and any periods, slashes or colons
# It will then put the say-as characters tag around all matches
punctuationRegex = re.compile(r'((?:\.[a-z]{2,6}(?:\/|$|\s))|(?:[\.\/:]+))')
taggedURL = re.sub(punctuationRegex, r'<say-as interpret-as="characters">\1</say-as>', url)
# Replace any instances of the URL with the tagged version
text = text.replace(url, taggedURL)
return text
def add_alias_tags(text):
for entryDict in aliasEntries:
# Get entry info
entryText = entryDict['Original Text']
entryAlias = entryDict['Alias']
if entryDict['Case Sensitive (True/False)'] == "":
isCaseSensitive = False
else:
isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)'])
# Find and replace the word
findWordRegex = rf'\b["\'()]?{entryText}[.,!?()]?["\']?\b' # Find the word, with optional punctuation after, and optional quotes before or after
if isCaseSensitive:
text = re.sub(findWordRegex, rf'{entryAlias}', text)
else:
text = re.sub(findWordRegex, rf'{entryAlias}', text, flags=re.IGNORECASE)
return text
# Uses the phoneme pronunciation file to add phoneme tags to the text
def add_phoneme_tags(text):
for entryDict in phonemeEntries:
# Get entry info
entryText = entryDict['Text']
entryPhoneme = entryDict['Phonetic Pronunciation']
entryAlphabet = entryDict['Phonetic Alphabet']
if entryDict['Case Sensitive (True/False)'] == "":
isCaseSensitive = False
else:
isCaseSensitive = parseBool(entryDict['Case Sensitive (True/False)'])
# Find and replace the word
findWordRegex = rf'(\b["\'()]?{entryText}[.,!?()]?["\']?\b)' # Find the word, with optional punctuation after, and optional quotes before or after
if isCaseSensitive:
text = re.sub(findWordRegex, rf'<phoneme alphabet="ipa" ph="{entryPhoneme}">\1</phoneme>', text)
else:
text = re.sub(findWordRegex, rf'<phoneme alphabet="{entryAlphabet}" ph="{entryPhoneme}">\1</phoneme>', text, flags=re.IGNORECASE)
return text
# ================================================== Azure Functions =========================================================
def synthesize_text_azure(text, duration, voiceName, languageCode):
# Create tag for desired duration of clip
durationTag = f'<mstts:audioduration value="{str(duration)}ms"/>'
# Create string for sentence pauses, if not default
if not azure_sentence_pause == 'default':
sentencePauseTag = f'<mstts:silence type="Sentenceboundary-exact" value="{str(azure_sentence_pause)}ms"/>'
else:
sentencePauseTag = ''
# Create string for comma pauses, if not default
if not azure_comma_pause == 'default':
commaPauseTag = f'<mstts:silence type="Comma-exact" value="{str(azure_comma_pause)}ms"/>'
else:
commaPauseTag = ''
# Set string for tag to set leading and trailing silence times to zero
leadSilenceTag = '<mstts:silence type="Leading-exact" value="0ms"/>'
tailSilenceTag = '<mstts:silence type="Tailing-exact" value="0ms"/>'
# Process text using pronunciation customization set by user
text = add_all_pronunciation_overrides(text)
# Create SSML syntax for Azure TTS
ssml = f"<speak version='1.0' xml:lang='{languageCode}' xmlns='http://www.w3.org/2001/10/synthesis' " \
"xmlns:mstts='http://www.w3.org/2001/mstts'>" \
f"<voice name='{voiceName}'>{sentencePauseTag}{commaPauseTag}{durationTag}{leadSilenceTag}{tailSilenceTag}" \
f"{text}</voice></speak>"
speech_config = speechsdk.SpeechConfig(subscription=AZURE_SPEECH_KEY, region=AZURE_SPEECH_REGION)
# For Azure voices, see: https://learn.microsoft.com/en-us/azure/cognitive-services/speech-service/language-support?tabs=stt-tts
speech_config.speech_synthesis_voice_name=voiceName
# For audio outputs, see: https://learn.microsoft.com/en-us/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechsynthesisoutputformat?view=azure-python
speech_config.set_speech_synthesis_output_format(speechsdk.SpeechSynthesisOutputFormat.Audio48Khz192KBitRateMonoMp3)
synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config, audio_config=None)
#result = synthesizer.speak_text_async(text).get()
result = synthesizer.speak_ssml_async(ssml).get()
stream = speechsdk.AudioDataStream(result)
return stream
def format_percentage_change(speedFactor):
# Determine speedFactor value for Azure TTS. It should be either 'default' or a relative change.
if speedFactor == 1.0:
rate = 'default'
else:
# Whether to add a plus sign to the number to relative change. A negative will automatically be added
if speedFactor >= 1.0:
percentSign = '+'
else:
percentSign = ''
# Convert speedFactor float value to a relative percentage
rate = percentSign + str(round((speedFactor - 1.0) * 100, 5)) + '%'
return rate
def synthesize_text_azure_batch(subsDict, langDict, skipSynthesize=False, secondPass=False):
def create_request_payload(remainingEntriesDict):
# Create SSML for all subtitles
ssmlJson = []
payloadSizeInBytes = 0
tempDict = dict(remainingEntriesDict) # Need to do this to avoid changing the original dict which would mess with the loop
for key, value in tempDict.items():
text = tempDict[key]['translated_text']
duration = tempDict[key]['duration_ms_buffered']
language = langDict['languageCode']
voice = langDict['voiceName']
# Create tag for desired duration of clip
durationTag = f'<mstts:audioduration value="{str(duration)}ms"/>'
# Create string for sentence pauses, if not default
if not azure_sentence_pause == 'default':
sentencePauseTag = f'<mstts:silence type="Sentenceboundary-exact" value="{str(azure_sentence_pause)}ms"/>'
else:
sentencePauseTag = ''
# Create string for comma pauses, if not default
if not azure_comma_pause == 'default':
commaPauseTag = f'<mstts:silence type="Comma-exact" value="{str(azure_comma_pause)}ms"/>'
else:
commaPauseTag = ''
# Set string for tag to set leading and trailing silence times to zero
leadSilenceTag = '<mstts:silence type="Leading-exact" value="0ms"/>'
tailSilenceTag = '<mstts:silence type="Tailing-exact" value="0ms"/>'
# Process text using pronunciation customization set by user
text = add_all_pronunciation_overrides(text)
# Create the SSML for each subtitle
ssml = f"<speak version='1.0' xml:lang='{language}' xmlns='http://www.w3.org/2001/10/synthesis' " \
"xmlns:mstts='http://www.w3.org/2001/mstts'>" \
f"<voice name='{voice}'>{sentencePauseTag}{commaPauseTag}{durationTag}{leadSilenceTag}{tailSilenceTag}" \
f"{text}</voice></speak>"
ssmlJson.append({"text": ssml})
# Construct request payload with SSML
# Reconstruct payload with every loop with new SSML so that the payload size is accurate
now = datetime.datetime.now()
pendingPayload = {
'displayName': langDict['languageCode'] + '-' + now.strftime("%Y-%m-%d %H:%M:%S"),
'description': 'Batch synthesis of ' + langDict['languageCode'] + ' subtitles',
"textType": "SSML",
# To use custom voice, see original example code script linked from azure_batch.py
"inputs": ssmlJson,
"properties": {
"outputFormat": "audio-48khz-192kbitrate-mono-mp3",
"wordBoundaryEnabled": False,
"sentenceBoundaryEnabled": False,
"concatenateResult": False,
"decompressOutputFiles": False
},
}
# Azure TTS Batch requests require payload must be under 500 kilobytes, so check payload is under 500,000 bytes. Not sure if they actually mean kibibytes, assume worst case.
# Payload will be formatted as json so must account for that too by doing json.dumps(), otherwise calculated size will be inaccurate
payloadSizeInBytes = len(str(json.dumps(pendingPayload)).encode('utf-8'))
if payloadSizeInBytes > 495000 or len(ssmlJson) > 995: # Leave some room for anything unexpected. Also number of inputs must be below 1000
# If payload would be too large, ignore the last entry and break out of loop
return payload, remainingEntriesDict
else:
payload = copy.deepcopy(pendingPayload) # Must make deepycopy otherwise ssmlJson will be updated in both instead of just pendingPayload
# Remove entry from remainingEntriesDict if it was added to payload
remainingEntriesDict.pop(key)
# If all the rest of the entries fit, return the payload
return payload, remainingEntriesDict
# ------------------------- End create_request_payload() -----------------------------------
# Create payloads, split into multiple if necessary
payloadList = []
remainingPayloadEntriesDict = dict(subsDict) # Will remove entries as they are added to payloads
while len(remainingPayloadEntriesDict) > 0:
payloadToAppend, remainingPayloadEntriesDict = create_request_payload(remainingPayloadEntriesDict)
payloadList.append(payloadToAppend)
# Tell user if request will be broken up into multiple payloads
if len(payloadList) > 1:
print(f'Payload will be broken up into {len(payloadList)} requests (due to Azure size limitations).')
# Use to keep track of filenames downloaded via separate zip files. WIll remove as they are downloaded
remainingDownloadedEntriesList = list(subsDict.keys())
# Clear out workingFolder
for filename in os.listdir('workingFolder'):
if not debug_mode:
os.remove(os.path.join('workingFolder', filename))
# Loop through payloads and submit to Azure
for payload in payloadList:
# Reset job_id from previous loops
job_id = None
# Send request to Azure
job_id = azure_batch.submit_synthesis(payload)
# Wait for job to finish
if job_id is not None:
status = "Running"
resultDownloadLink = None
while True: # Must use break to exit loop
# Get status
response = azure_batch.get_synthesis(job_id)
status = response.json()['status']
if status == 'Succeeded':
print('Batch synthesis job succeeded')
resultDownloadLink = azure_batch.get_synthesis(job_id).json()['outputs']['result']
break
elif status == 'Failed':
print('ERROR: Batch synthesis job failed!')
print("Reason:" + response.reason)
break
else:
print(f'Waiting for Azure batch synthesis job to finish. Status: [{status}]')
time.sleep(5)
# Download resultig zip file
if resultDownloadLink is not None:
# Download zip file
urlResponse = urlopen(resultDownloadLink)
# If debug mode, save zip file to disk
if debug_mode:
if secondPass == False:
zipName = 'azureBatch.zip'
else:
zipName = 'azureBatchPass2.zip'
zipPath = os.path.join('workingFolder', zipName)
with open(zipPath, 'wb') as f:
f.write(urlResponse.read())
# Reset urlResponse so it can be read again
urlResponse = urlopen(resultDownloadLink)
# Process zip file
virtualResultZip = io.BytesIO(urlResponse.read())
zipdata = zipfile.ZipFile(virtualResultZip)
zipinfos = zipdata.infolist()
# Reorder zipinfos so the file names are in alphanumeric order
zipinfos.sort(key=lambda x: x.filename)
# Only extract necessary files, and rename them while doing so
for file in zipinfos:
if file.filename == "summary.json":
#zipdata.extract(file, 'workingFolder') # For debugging
pass
elif "json" not in file.filename:
# Rename file to match first entry in remainingDownloadedEntriesDict, then extract
currentFileNum = remainingDownloadedEntriesList[0]
file.filename = str(currentFileNum) + '.mp3'
#file.filename = file.filename.lstrip('0')
# Add file path to subsDict then remove from remainingDownloadedEntriesList
subsDict[currentFileNum]['TTS_FilePath'] = os.path.join('workingFolder', str(currentFileNum)) + '.mp3'
# Extract file
zipdata.extract(file, 'workingFolder')
# Remove entry from remainingDownloadedEntriesList
remainingDownloadedEntriesList.pop(0)
return subsDict
def synthesize_dictionary_batch(subsDict, langDict, skipSynthesize=False, secondPass=False):
if not skipSynthesize:
subsDict = synthesize_text_azure_batch(subsDict, langDict, skipSynthesize, secondPass)
return subsDict
def synthesize_dictionary(subsDict, langDict, outputFolder, skipSynthesize=False, secondPass=False):
for key, value in subsDict.items():
# TTS each subtitle text, write to file, write filename into dictionary
workingFolder = os.path.join(outputFolder, 'workingFolder')
filePath = os.path.join(workingFolder, f'{str(key)}.mp3')
filePathStem = os.path.join(workingFolder, f'{str(key)}')
if not skipSynthesize:
duration = value['duration_ms_buffered']
if secondPass:
# Get speed factor from subsDict
speedFactor = subsDict[key]['speed_factor']
else:
speedFactor = float(1.0)
# Prepare output location. If folder doesn't exist, create it
if not os.path.exists(os.path.dirname(filePath)):
try:
os.makedirs(os.path.dirname(filePath))
except OSError:
print("Error creating directory")
# If Azure TTS, use Azure API
if tts_service == "azure":
# Audio variable is an AudioDataStream object
audio = synthesize_text_azure(value['translated_text'], duration, langDict['voiceName'], langDict['languageCode'])
# Save to file using save_to_wav_file method of audio object
audio.save_to_wav_file(filePath)
# If debug mode, write to files after Google TTS
if debug_mode and secondPass == False:
audio.save_to_wav_file(filePathStem+"_p1.mp3")
elif debug_mode and secondPass == True:
audio.save_to_wav_file(filePathStem+"_p2.mp3")
subsDict[key]['TTS_FilePath'] = filePath
# Get key index
keyIndex = list(subsDict.keys()).index(key)
# Print progress and overwrite line next time
if not secondPass:
print(f" Synthesizing TTS Line: {keyIndex+1} of {len(subsDict)}", end="\r")
else:
print(f" Synthesizing TTS Line (2nd Pass): {keyIndex+1} of {len(subsDict)}", end="\r")
print(" ") # Clear the line
return subsDict