#!/usr/bin/env python3 # -*- coding: UTF-8 -*- # Imports import re, regex from . import utils from .azure_translate import azure_translate_text from operator import itemgetter import sys import copy import os import html from pathlib import Path combine_subtitles_max_chars = 200 translate_service = 'azure' # -------------------------------- No Translate and Manual Translation Functions ----------------------------------- BASE_DIR = Path(__file__).resolve().parent.parent / 'SSML_Customization' # Import files and put into dictionaries noTranslateOverrideFile = os.path.join(BASE_DIR, 'dont_translate_phrases.txt') dontTranslateList = utils.txt_to_list(noTranslateOverrideFile) manualTranslationOverrideFile = os.path.join(BASE_DIR, 'Manual_Translations.csv') manualTranslationsDict = utils.csv_to_dict(manualTranslationOverrideFile) urlListFile = os.path.join(BASE_DIR, 'url_list.txt') urlList = utils.txt_to_list(urlListFile) # Add span tags around certain words to exclude them from being translated def add_notranslate_tags_from_notranslate_file(text, phraseList): for word in phraseList: findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{word}[.,!?()]?["\']?)(\p{{Z}}|$)' #\p ensures it works with unicode characters findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE) # Find the word, with optional punctuation after, and optional quotes before or after text = findWordRegexCompiled.sub(r'\1\2\3', text) return text def remove_notranslate_tags(text): text = text.replace('', '').replace('', '') return text def add_notranslate_tags_for_manual_translations(text, langcode): for manualTranslatedText in manualTranslationsDict: # Only replace text if the language matches the entry in the manual translations file if manualTranslatedText['Language Code'] == langcode: originalText = manualTranslatedText['Original Text'] findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{originalText}[.,!?()]?["\']?)(\p{{Z}}|$)' findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE) text = findWordRegexCompiled.sub(r'\1\2\3', text) return text # Replace certain words or phrases with their manual translation def replace_manual_translations(text, langcode): for manualTranslatedText in manualTranslationsDict: # Only replace text if the language matches the entry in the manual translations file if manualTranslatedText['Language Code'] == langcode: originalText = manualTranslatedText['Original Text'] translatedText = manualTranslatedText['Translated Text'] findWordRegex = rf'(\p{{Z}}|^)(["\'()]?{originalText}[.,!?()]?["\']?)(\p{{Z}}|$)' findWordRegexCompiled = regex.compile(findWordRegex, flags=re.IGNORECASE | re.UNICODE) # Substitute the matched word with the translated text text = findWordRegexCompiled.sub(rf'\1{translatedText}\3', text) return text #======================================== Translate Text ================================================ # Note: This function was almost entirely written by GPT-3 after feeding it my original code and asking it to change it so it # would break up the text into chunks if it was too long. It appears to work def process_response_text(text, targetLanguage): text = html.unescape(text) text = remove_notranslate_tags(text) text = replace_manual_translations(text, targetLanguage) return text def split_transcript_chunks(text, max_length=5000): # Calculate the total number of utf-8 codepoints #totalCodepoints = len(text.encode("utf-8")) # Split the transcript into sentences sentences = re.split(r'(?<=[.!?])\s+', text) # Initialize a list to store the chunks of text chunks = [] # Initialize a string to store a chunk of text chunk = "" # For each sentence in the list of sentences for sentence in sentences: # If adding the sentence to the chunk would keep it within the maximum length if len(chunk.encode("utf-8")) + len(sentence.encode("utf-8")) + 1 <= max_length: # Adding 1 to account for space # Add the sentence to the chunk chunk += sentence + " " else: # If adding the sentence would exceed the maximum length and chunk is not empty if chunk: # Add the chunk to the list of chunks chunks.append(chunk.strip()) # Start a new chunk with the current sentence chunk = sentence + " " # Add the last chunk to the list of chunks (if it's not empty) if chunk: chunks.append(chunk.strip()) # Return the list of chunks return chunks def convertChunkListToCompatibleDict(chunkList): # Create dictionary with numbers as keys and chunks as values chunkDict = {} for i, chunk in enumerate(chunkList, 1): chunkDict[i] = {'text': chunk} return chunkDict # Translate the text entries of the dictionary def translate_dictionary(inputSubsDict, langDict, translatedSrtFileName, skipTranslation=False, ): targetLanguage = langDict['targetLanguage'] sourceLanguage = langDict['sourceLanguage'] translateService = langDict['translateService'] # Create a container for all the text to be translated textToTranslate = [] for key in inputSubsDict: originalText = inputSubsDict[key]['text'] # Add any 'notranslate' tags to the text processedText = add_notranslate_tags_from_notranslate_file(originalText, dontTranslateList) processedText = add_notranslate_tags_from_notranslate_file(processedText, urlList) processedText = add_notranslate_tags_for_manual_translations(processedText, targetLanguage) # Add the text to the list of text to be translated textToTranslate.append(processedText) # Calculate the total number of utf-8 codepoints codepoints = 0 for text in textToTranslate: codepoints += len(text.encode("utf-8")) # If the codepoints are greater than 28000, split the request into multiple # Google's API limit is 30000 Utf-8 codepoints per request, while DeepL's is 130000, but we leave some room just in case if skipTranslation == False: if translateService == 'azure': print("Translating text using Azure...") result = azure_translate_text(textToTranslate, sourceLanguage, targetLanguage) # Add the translated texts to the dictionary for i, key in enumerate(inputSubsDict): inputSubsDict[key]['translated_text'] = process_response_text(result[i]["text"], targetLanguage) # Print progress, overwrite the same line print(f' Translated: {key} of {len(inputSubsDict)}', end='\r') else: print("Error: Invalid translate_service setting. Only 'Azure' is supported.") sys.exit() else: for key in inputSubsDict: inputSubsDict[key]['translated_text'] = process_response_text(inputSubsDict[key]['text'], targetLanguage) # Skips translating, such as for testing print(" ") combinedProcessedDict = combine_subtitles_advanced(inputSubsDict, int(combine_subtitles_max_chars)) if skipTranslation == False: # Write new srt file with translated text with open(translatedSrtFileName, 'w', encoding='utf-8-sig') as f: for key in combinedProcessedDict: f.write(str(key) + '\n') f.write(combinedProcessedDict[key]['srt_timestamps_line'] + '\n') f.write(combinedProcessedDict[key]['translated_text'] + '\n') f.write('\n') return combinedProcessedDict ##### Add additional info to the dictionary for each language ##### def set_translation_info(languageBatchDict): newBatchSettingsDict = copy.deepcopy(languageBatchDict) # If using Azure, set all languages to use Azure in dictionary if translate_service == 'azure': for langNum, langInfo in languageBatchDict.items(): newBatchSettingsDict[langNum]['translate_service'] = 'azure' newBatchSettingsDict[langNum]['formality'] = None else: print("Error: No valid translation service selected. Please choose a valid service or enable 'skip_translation' in config.") sys.exit() return newBatchSettingsDict #======================================== Combine Subtitle Lines ================================================ def combine_subtitles_advanced(inputDict, maxCharacters=200): charRateGoal = 20 #20 gapThreshold = 100 # The maximum gap between subtitles to combine noMorePossibleCombines = False # Convert dictionary to list of dictionaries of the values entryList = [] for key, value in inputDict.items(): value['originalIndex'] = int(key)-1 entryList.append(value) while not noMorePossibleCombines: entryList, noMorePossibleCombines = combine_single_pass(entryList, charRateGoal, gapThreshold, maxCharacters) # Convert the list back to a dictionary then return it return dict(enumerate(entryList, start=1)) def combine_single_pass(entryListLocal, charRateGoal, gapThreshold, maxCharacters): # Want to restart the loop if a change is made, so use this variable, otherwise break only if the end is reached reachedEndOfList = False noMorePossibleCombines = True # Will be set to False if a combination is made # Use while loop because the list is being modified while not reachedEndOfList: # Need to update original index in here for entry in entryListLocal: entry['originalIndex'] = entryListLocal.index(entry) # Will use later to check if an entry is the last one in the list, because the last entry will have originalIndex equal to the length of the list - 1 originalNumberOfEntries = len(entryListLocal) # Need to calculate the char_rate for each entry, any time something changes, so put it at the top of this loop entryListLocal = calc_list_speaking_rates(entryListLocal, charRateGoal) # Sort the list by the difference in speaking speed from charRateGoal priorityOrderedList = sorted(entryListLocal, key=itemgetter('char_rate_diff'), reverse=True) # Iterates through the list in order of priority, and uses that index to operate on entryListLocal # For loop is broken after a combination is made, so that the list can be re-sorted and re-iterated for progress, data in enumerate(priorityOrderedList): i = data['originalIndex'] # Check if last entry, and therefore will end loop when done with this iteration if progress == len(priorityOrderedList) - 1: reachedEndOfList = True # Check if the current entry is outside the upper and lower bounds if (data['char_rate'] > charRateGoal or data['char_rate'] < charRateGoal): # Check if the entry is the first in entryListLocal, if so do not consider the previous entry if data['originalIndex'] == 0: considerPrev = False else: considerPrev = True # Check if the entry is the last in entryListLocal, if so do not consider the next entry if data['originalIndex'] == originalNumberOfEntries - 1: considerNext = False else: considerNext = True # Check if current entry is still in the list - if it has been combined with another entry, it will not be # Get the char_rate of the next and previous entries, if they exist, and calculate the difference # If the diff is positive, then it is lower than the current char_rate try: nextCharRate = entryListLocal[i+1]['char_rate'] nextDiff = data['char_rate'] - nextCharRate except IndexError: considerNext = False nextCharRate = None nextDiff = None try: prevCharRate = entryListLocal[i-1]['char_rate'] prevDiff = data['char_rate'] - prevCharRate except IndexError: considerPrev = False prevCharRate = None prevDiff = None else: continue # Define functions for combining with previous or next entries - Generated with copilot, it's possible this isn't perfect def combine_with_next(): entryListLocal[i]['text'] = entryListLocal[i]['text'] + ' ' + entryListLocal[i+1]['text'] entryListLocal[i]['translated_text'] = entryListLocal[i]['translated_text'] + ' ' + entryListLocal[i+1]['translated_text'] entryListLocal[i]['end_ms'] = entryListLocal[i+1]['end_ms'] entryListLocal[i]['end_ms_buffered'] = entryListLocal[i+1]['end_ms_buffered'] entryListLocal[i]['duration_ms'] = int(entryListLocal[i+1]['end_ms']) - int(entryListLocal[i]['start_ms']) entryListLocal[i]['duration_ms_buffered'] = int(entryListLocal[i+1]['end_ms_buffered']) - int(entryListLocal[i]['start_ms_buffered']) entryListLocal[i]['srt_timestamps_line'] = entryListLocal[i]['srt_timestamps_line'].split(' --> ')[0] + ' --> ' + entryListLocal[i+1]['srt_timestamps_line'].split(' --> ')[1] del entryListLocal[i+1] def combine_with_prev(): entryListLocal[i-1]['text'] = entryListLocal[i-1]['text'] + ' ' + entryListLocal[i]['text'] entryListLocal[i-1]['translated_text'] = entryListLocal[i-1]['translated_text'] + ' ' + entryListLocal[i]['translated_text'] entryListLocal[i-1]['end_ms'] = entryListLocal[i]['end_ms'] entryListLocal[i-1]['end_ms_buffered'] = entryListLocal[i]['end_ms_buffered'] entryListLocal[i-1]['duration_ms'] = int(entryListLocal[i]['end_ms']) - int(entryListLocal[i-1]['start_ms']) entryListLocal[i-1]['duration_ms_buffered'] = int(entryListLocal[i]['end_ms_buffered']) - int(entryListLocal[i-1]['start_ms_buffered']) entryListLocal[i-1]['srt_timestamps_line'] = entryListLocal[i-1]['srt_timestamps_line'].split(' --> ')[0] + ' --> ' + entryListLocal[i]['srt_timestamps_line'].split(' --> ')[1] del entryListLocal[i] # Choose whether to consider next and previous entries, and if neither then continue to next loop if data['char_rate'] > charRateGoal: # Check to ensure next/previous rates are lower than current rate, and the combined entry is not too long, and the gap between entries is not too large # Need to add check for considerNext and considerPrev first, because if run other checks when there is no next/prev value to check, it will throw an error if considerNext == False or nextDiff or nextDiff < 0 or (entryListLocal[i]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i]['translated_text']) + len(entryListLocal[i+1]['translated_text']) > maxCharacters): considerNext = False try: if considerPrev == False or not prevDiff or prevDiff < 0 or (entryListLocal[i-1]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i-1]['translated_text']) + len(entryListLocal[i]['translated_text']) > maxCharacters): considerPrev = False except TypeError: considerPrev = False elif data['char_rate'] < charRateGoal: # Check to ensure next/previous rates are higher than current rate if considerNext == False or not nextDiff or nextDiff > 0 or (entryListLocal[i]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i]['translated_text']) + len(entryListLocal[i+1]['translated_text']) > maxCharacters): considerNext = False try: if considerPrev == False or not prevDiff or prevDiff > 0 or (entryListLocal[i-1]['break_until_next'] >= gapThreshold) or (len(entryListLocal[i-1]['translated_text']) + len(entryListLocal[i]['translated_text']) > maxCharacters): considerPrev = False except TypeError: considerPrev = False else: continue # Continue to next loop if neither are considered if not considerNext and not considerPrev: continue # Should only reach this point if two entries are to be combined if data['char_rate'] > charRateGoal: # If both are to be considered, then choose the one with the lower char_rate if considerNext and considerPrev: if nextDiff < prevDiff: combine_with_next() noMorePossibleCombines = False break else: combine_with_prev() noMorePossibleCombines = False break # If only one is to be considered, then combine with that one elif considerNext: combine_with_next() noMorePossibleCombines = False break elif considerPrev: combine_with_prev() noMorePossibleCombines = False break else: print(f"Error U: Should not reach this point! Current entry = {i}") print(f"Current Entry Text = {data['text']}") continue elif data['char_rate'] < charRateGoal: # If both are to be considered, then choose the one with the higher char_rate if considerNext and considerPrev: if nextDiff > prevDiff: combine_with_next() noMorePossibleCombines = False break else: combine_with_prev() noMorePossibleCombines = False break # If only one is to be considered, then combine with that one elif considerNext: combine_with_next() noMorePossibleCombines = False break elif considerPrev: combine_with_prev() noMorePossibleCombines = False break else: print(f"Error L: Should not reach this point! Index = {i}") print(f"Current Entry Text = {data['text']}") continue return entryListLocal, noMorePossibleCombines #-- End of combine_single_pass -- #---------------------------------------------------------------------- # Calculate the number of characters per second for each subtitle entry def calc_dict_speaking_rates(inputDict, dictKey='translated_text'): tempDict = copy.deepcopy(inputDict) for key, value in tempDict.items(): tempDict[key]['char_rate'] = round(len(value[dictKey]) / (int(value['duration_ms']) / 1000), 2) return tempDict def calc_list_speaking_rates(inputList, charRateGoal, dictKey='translated_text'): tempList = copy.deepcopy(inputList) for i in range(len(tempList)): # Calculate the number of characters per second based on the duration of the entry tempList[i]['char_rate'] = round(len(tempList[i][dictKey]) / (int(tempList[i]['duration_ms']) / 1000), 2) # Calculate the difference between the current char_rate and the goal char_rate - Absolute Value tempList[i]['char_rate_diff'] = abs(round(tempList[i]['char_rate'] - charRateGoal, 2)) return tempList