vidverse / app /scripts /translate.py
badal
feat: initial commit
2f2406a
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Imports
import re, regex
from . import utils
from .azure_translate import azure_translate_text
from operator import itemgetter
import sys
import copy
import os
import html
from pathlib import Path
combine_subtitles_max_chars = 200
translate_service = 'azure'
# -------------------------------- No Translate and Manual Translation Functions -----------------------------------
BASE_DIR = Path(__file__).resolve().parent.parent / 'SSML_Customization'
# Import files and put into dictionaries
noTranslateOverrideFile = os.path.join(BASE_DIR, 'dont_translate_phrases.txt')
dontTranslateList = utils.txt_to_list(noTranslateOverrideFile)
manualTranslationOverrideFile = os.path.join(BASE_DIR, 'Manual_Translations.csv')
manualTranslationsDict = utils.csv_to_dict(manualTranslationOverrideFile)
urlListFile = os.path.join(BASE_DIR, 'url_list.txt')
urlList = utils.txt_to_list(urlListFile)
# Add span tags around certain words to exclude them from being translated
def add_notranslate_tags_from_notranslate_file(text, phraseList):
for word in phraseList:
findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{word}[.,!?()]?["\']?)(\p{{Z}}|$)' #\p ensures it works with unicode characters
findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE)
# Find the word, with optional punctuation after, and optional quotes before or after
text = findWordRegexCompiled.sub(r'\1<span class="notranslate">\2</span>\3', text)
return text
def remove_notranslate_tags(text):
text = text.replace('<span class="notranslate">', '').replace('</span>', '')
return text
def add_notranslate_tags_for_manual_translations(text, langcode):
for manualTranslatedText in manualTranslationsDict:
# Only replace text if the language matches the entry in the manual translations file
if manualTranslatedText['Language Code'] == langcode:
originalText = manualTranslatedText['Original Text']
findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{originalText}[.,!?()]?["\']?)(\p{{Z}}|$)'
findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE)
text = findWordRegexCompiled.sub(r'\1<span class="notranslate">\2</span>\3', text)
return text
# Replace certain words or phrases with their manual translation
def replace_manual_translations(text, langcode):
for manualTranslatedText in manualTranslationsDict:
# Only replace text if the language matches the entry in the manual translations file
if manualTranslatedText['Language Code'] == langcode:
originalText = manualTranslatedText['Original Text']
translatedText = manualTranslatedText['Translated Text']
findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{originalText}[.,!?()]?["\']?)(\p{{Z}}|$)'
findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE)
# Substitute the matched word with the translated text
text = findWordRegexCompiled.sub(rf'\1{translatedText}\3', text)
return text
#======================================== Translate Text ================================================
# Note: This function was almost entirely written by GPT-3 after feeding it my original code and asking it to change it so it
# would break up the text into chunks if it was too long. It appears to work
def process_response_text(text, targetLanguage):
text = html.unescape(text)
text = remove_notranslate_tags(text)
text = replace_manual_translations(text, targetLanguage)
return text
def split_transcript_chunks(text, max_length=5000):
# Calculate the total number of utf-8 codepoints
#totalCodepoints = len(text.encode("utf-8"))
# Split the transcript into sentences
sentences = re.split(r'(?<=[.!?])\s+', text)
# Initialize a list to store the chunks of text
chunks = []
# Initialize a string to store a chunk of text
chunk = ""
# For each sentence in the list of sentences
for sentence in sentences:
# If adding the sentence to the chunk would keep it within the maximum length
if len(chunk.encode("utf-8")) + len(sentence.encode("utf-8")) + 1 <= max_length: # Adding 1 to account for space
# Add the sentence to the chunk
chunk += sentence + " "
else:
# If adding the sentence would exceed the maximum length and chunk is not empty
if chunk:
# Add the chunk to the list of chunks
chunks.append(chunk.strip())
# Start a new chunk with the current sentence
chunk = sentence + " "
# Add the last chunk to the list of chunks (if it's not empty)
if chunk:
chunks.append(chunk.strip())
# Return the list of chunks
return chunks
def convertChunkListToCompatibleDict(chunkList):
# Create dictionary with numbers as keys and chunks as values
chunkDict = {}
for i, chunk in enumerate(chunkList, 1):
chunkDict[i] = {'text': chunk}
return chunkDict
# Translate the text entries of the dictionary
def translate_dictionary(inputSubsDict, langDict, translatedSrtFileName, skipTranslation=False, ):
targetLanguage = langDict['targetLanguage']
sourceLanguage = langDict['sourceLanguage']
translateService = langDict['translateService']
# Create a container for all the text to be translated
textToTranslate = []
for key in inputSubsDict:
originalText = inputSubsDict[key]['text']
# Add any 'notranslate' tags to the text
processedText = add_notranslate_tags_from_notranslate_file(originalText, dontTranslateList)
processedText = add_notranslate_tags_from_notranslate_file(processedText, urlList)
processedText = add_notranslate_tags_for_manual_translations(processedText, targetLanguage)
# Add the text to the list of text to be translated
textToTranslate.append(processedText)
# Calculate the total number of utf-8 codepoints
codepoints = 0
for text in textToTranslate:
codepoints += len(text.encode("utf-8"))
# If the codepoints are greater than 28000, split the request into multiple
# Google's API limit is 30000 Utf-8 codepoints per request, while DeepL's is 130000, but we leave some room just in case
if skipTranslation == False:
if translateService == 'azure':
print("Translating text using Azure...")
result = azure_translate_text(textToTranslate, sourceLanguage, targetLanguage)
# Add the translated texts to the dictionary
for i, key in enumerate(inputSubsDict):
inputSubsDict[key]['translated_text'] = process_response_text(result[i]["text"], targetLanguage)
# Print progress, overwrite the same line
print(f' Translated: {key} of {len(inputSubsDict)}', end='\r')
else:
print("Error: Invalid translate_service setting. Only 'Azure' is supported.")
sys.exit()
else:
for key in inputSubsDict:
inputSubsDict[key]['translated_text'] = process_response_text(inputSubsDict[key]['text'], targetLanguage) # Skips translating, such as for testing
print(" ")
combinedProcessedDict = combine_subtitles_advanced(inputSubsDict, int(combine_subtitles_max_chars))
if skipTranslation == False:
# Write new srt file with translated text
with open(translatedSrtFileName, 'w', encoding='utf-8-sig') as f:
for key in combinedProcessedDict:
f.write(str(key) + '\n')
f.write(combinedProcessedDict[key]['srt_timestamps_line'] + '\n')
f.write(combinedProcessedDict[key]['translated_text'] + '\n')
f.write('\n')
return combinedProcessedDict
##### Add additional info to the dictionary for each language #####
def set_translation_info(languageBatchDict):
newBatchSettingsDict = copy.deepcopy(languageBatchDict)
# If using Azure, set all languages to use Azure in dictionary
if translate_service == 'azure':
for langNum, langInfo in languageBatchDict.items():
newBatchSettingsDict[langNum]['translate_service'] = 'azure'
newBatchSettingsDict[langNum]['formality'] = None
else:
print("Error: No valid translation service selected. Please choose a valid service or enable 'skip_translation' in config.")
sys.exit()
return newBatchSettingsDict
#======================================== Combine Subtitle Lines ================================================
def combine_subtitles_advanced(inputDict, maxCharacters=200):
charRateGoal = 20 #20
gapThreshold = 100 # The maximum gap between subtitles to combine
noMorePossibleCombines = False
# Convert dictionary to list of dictionaries of the values
entryList = []
for key, value in inputDict.items():
value['originalIndex'] = int(key)-1
entryList.append(value)
while not noMorePossibleCombines:
entryList, noMorePossibleCombines = combine_single_pass(entryList, charRateGoal, gapThreshold, maxCharacters)
# Convert the list back to a dictionary then return it
return dict(enumerate(entryList, start=1))
def combine_single_pass(entryListLocal, charRateGoal, gapThreshold, maxCharacters):
# Want to restart the loop if a change is made, so use this variable, otherwise break only if the end is reached
reachedEndOfList = False
noMorePossibleCombines = True # Will be set to False if a combination is made
# Use while loop because the list is being modified
while not reachedEndOfList:
# Need to update original index in here
for entry in entryListLocal:
entry['originalIndex'] = entryListLocal.index(entry)
# Will use later to check if an entry is the last one in the list, because the last entry will have originalIndex equal to the length of the list - 1
originalNumberOfEntries = len(entryListLocal)
# Need to calculate the char_rate for each entry, any time something changes, so put it at the top of this loop
entryListLocal = calc_list_speaking_rates(entryListLocal, charRateGoal)
# Sort the list by the difference in speaking speed from charRateGoal
priorityOrderedList = sorted(entryListLocal, key=itemgetter('char_rate_diff'), reverse=True)
# Iterates through the list in order of priority, and uses that index to operate on entryListLocal
# For loop is broken after a combination is made, so that the list can be re-sorted and re-iterated
for progress, data in enumerate(priorityOrderedList):
i = data['originalIndex']
# Check if last entry, and therefore will end loop when done with this iteration
if progress == len(priorityOrderedList) - 1:
reachedEndOfList = True
# Check if the current entry is outside the upper and lower bounds
if (data['char_rate'] > charRateGoal or data['char_rate'] < charRateGoal):
# Check if the entry is the first in entryListLocal, if so do not consider the previous entry
if data['originalIndex'] == 0:
considerPrev = False
else:
considerPrev = True
# Check if the entry is the last in entryListLocal, if so do not consider the next entry
if data['originalIndex'] == originalNumberOfEntries - 1:
considerNext = False
else:
considerNext = True
# Check if current entry is still in the list - if it has been combined with another entry, it will not be
# Get the char_rate of the next and previous entries, if they exist, and calculate the difference
# If the diff is positive, then it is lower than the current char_rate
try:
nextCharRate = entryListLocal[i+1]['char_rate']
nextDiff = data['char_rate'] - nextCharRate
except IndexError:
considerNext = False
nextCharRate = None
nextDiff = None
try:
prevCharRate = entryListLocal[i-1]['char_rate']
prevDiff = data['char_rate'] - prevCharRate
except IndexError:
considerPrev = False
prevCharRate = None
prevDiff = None
else:
continue
# Define functions for combining with previous or next entries - Generated with copilot, it's possible this isn't perfect
def combine_with_next():
entryListLocal[i]['text'] = entryListLocal[i]['text'] + ' ' + entryListLocal[i+1]['text']
entryListLocal[i]['translated_text'] = entryListLocal[i]['translated_text'] + ' ' + entryListLocal[i+1]['translated_text']
entryListLocal[i]['end_ms'] = entryListLocal[i+1]['end_ms']
entryListLocal[i]['end_ms_buffered'] = entryListLocal[i+1]['end_ms_buffered']
entryListLocal[i]['duration_ms'] = int(entryListLocal[i+1]['end_ms']) - int(entryListLocal[i]['start_ms'])
entryListLocal[i]['duration_ms_buffered'] = int(entryListLocal[i+1]['end_ms_buffered']) - int(entryListLocal[i]['start_ms_buffered'])
entryListLocal[i]['srt_timestamps_line'] = entryListLocal[i]['srt_timestamps_line'].split(' --> ')[0] + ' --> ' + entryListLocal[i+1]['srt_timestamps_line'].split(' --> ')[1]
del entryListLocal[i+1]
def combine_with_prev():
entryListLocal[i-1]['text'] = entryListLocal[i-1]['text'] + ' ' + entryListLocal[i]['text']
entryListLocal[i-1]['translated_text'] = entryListLocal[i-1]['translated_text'] + ' ' + entryListLocal[i]['translated_text']
entryListLocal[i-1]['end_ms'] = entryListLocal[i]['end_ms']
entryListLocal[i-1]['end_ms_buffered'] = entryListLocal[i]['end_ms_buffered']
entryListLocal[i-1]['duration_ms'] = int(entryListLocal[i]['end_ms']) - int(entryListLocal[i-1]['start_ms'])
entryListLocal[i-1]['duration_ms_buffered'] = int(entryListLocal[i]['end_ms_buffered']) - int(entryListLocal[i-1]['start_ms_buffered'])
entryListLocal[i-1]['srt_timestamps_line'] = entryListLocal[i-1]['srt_timestamps_line'].split(' --> ')[0] + ' --> ' + entryListLocal[i]['srt_timestamps_line'].split(' --> ')[1]
del entryListLocal[i]
# Choose whether to consider next and previous entries, and if neither then continue to next loop
if data['char_rate'] > charRateGoal:
# Check to ensure next/previous rates are lower than current rate, and the combined entry is not too long, and the gap between entries is not too large
# Need to add check for considerNext and considerPrev first, because if run other checks when there is no next/prev value to check, it will throw an error
if considerNext == False or nextDiff or nextDiff < 0 or (entryListLocal[i]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i]['translated_text']) + len(entryListLocal[i+1]['translated_text']) > maxCharacters):
considerNext = False
try:
if considerPrev == False or not prevDiff or prevDiff < 0 or (entryListLocal[i-1]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i-1]['translated_text']) + len(entryListLocal[i]['translated_text']) > maxCharacters):
considerPrev = False
except TypeError:
considerPrev = False
elif data['char_rate'] < charRateGoal:
# Check to ensure next/previous rates are higher than current rate
if considerNext == False or not nextDiff or nextDiff > 0 or (entryListLocal[i]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i]['translated_text']) + len(entryListLocal[i+1]['translated_text']) > maxCharacters):
considerNext = False
try:
if considerPrev == False or not prevDiff or prevDiff > 0 or (entryListLocal[i-1]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i-1]['translated_text']) + len(entryListLocal[i]['translated_text']) > maxCharacters):
considerPrev = False
except TypeError:
considerPrev = False
else:
continue
# Continue to next loop if neither are considered
if not considerNext and not considerPrev:
continue
# Should only reach this point if two entries are to be combined
if data['char_rate'] > charRateGoal:
# If both are to be considered, then choose the one with the lower char_rate
if considerNext and considerPrev:
if nextDiff < prevDiff:
combine_with_next()
noMorePossibleCombines = False
break
else:
combine_with_prev()
noMorePossibleCombines = False
break
# If only one is to be considered, then combine with that one
elif considerNext:
combine_with_next()
noMorePossibleCombines = False
break
elif considerPrev:
combine_with_prev()
noMorePossibleCombines = False
break
else:
print(f"Error U: Should not reach this point! Current entry = {i}")
print(f"Current Entry Text = {data['text']}")
continue
elif data['char_rate'] < charRateGoal:
# If both are to be considered, then choose the one with the higher char_rate
if considerNext and considerPrev:
if nextDiff > prevDiff:
combine_with_next()
noMorePossibleCombines = False
break
else:
combine_with_prev()
noMorePossibleCombines = False
break
# If only one is to be considered, then combine with that one
elif considerNext:
combine_with_next()
noMorePossibleCombines = False
break
elif considerPrev:
combine_with_prev()
noMorePossibleCombines = False
break
else:
print(f"Error L: Should not reach this point! Index = {i}")
print(f"Current Entry Text = {data['text']}")
continue
return entryListLocal, noMorePossibleCombines
#-- End of combine_single_pass --
#----------------------------------------------------------------------
# Calculate the number of characters per second for each subtitle entry
def calc_dict_speaking_rates(inputDict, dictKey='translated_text'):
tempDict = copy.deepcopy(inputDict)
for key, value in tempDict.items():
tempDict[key]['char_rate'] = round(len(value[dictKey]) / (int(value['duration_ms']) / 1000), 2)
return tempDict
def calc_list_speaking_rates(inputList, charRateGoal, dictKey='translated_text'):
tempList = copy.deepcopy(inputList)
for i in range(len(tempList)):
# Calculate the number of characters per second based on the duration of the entry
tempList[i]['char_rate'] = round(len(tempList[i][dictKey]) / (int(tempList[i]['duration_ms']) / 1000), 2)
# Calculate the difference between the current char_rate and the goal char_rate - Absolute Value
tempList[i]['char_rate_diff'] = abs(round(tempList[i]['char_rate'] - charRateGoal, 2))
return tempList