import base64 import json import os from pathlib import Path import tempfile import time import audioread import numpy as np import torch from torchaudio.transforms import Resample from aip_trainer import WordMatching as wm, app_logger from aip_trainer import pronunciationTrainer, sample_rate_start trainer_SST_lambda = { 'de': pronunciationTrainer.getTrainer("de"), 'en': pronunciationTrainer.getTrainer("en") } transform = Resample(orig_freq=sample_rate_start, new_freq=16000) def lambda_handler(event, context): data = json.loads(event['body']) real_text = data['title'] base64Audio = data["base64Audio"] app_logger.debug(f"base64Audio:{base64Audio} ...") file_bytes_or_audiotmpfile = base64.b64decode(base64Audio[22:].encode('utf-8')) language = data['language'] if len(real_text) == 0: return { 'statusCode': 200, 'headers': { 'Access-Control-Allow-Headers': '*', 'Access-Control-Allow-Credentials': "true", 'Access-Control-Allow-Origin': 'http://127.0.0.1:3000/', 'Access-Control-Allow-Methods': 'OPTIONS,POST,GET' }, 'body': '' } output = get_speech_to_score_dict(real_text=real_text, file_bytes_or_audiotmpfile=file_bytes_or_audiotmpfile, language=language, remove_random_file=False) output = json.dumps(output) app_logger.debug(f"output: {output} ...") return output def get_speech_to_score_dict(real_text: str, file_bytes_or_audiotmpfile: str | dict, language: str = "en", remove_random_file: bool = True, extension: str = ".ogg"): from soundfile import LibsndfileError app_logger.info(f"real_text:{real_text} ...") app_logger.debug(f"file_bytes:{file_bytes_or_audiotmpfile} ...") app_logger.info(f"language:{language} ...") if real_text is None or len(real_text) == 0: raise ValueError(f"cannot read an empty/None text: '{real_text}'...") if language is None or len(language) == 0: raise NotImplementedError(f"Not tested/supported with '{language}' language...") if not isinstance(file_bytes_or_audiotmpfile, (bytes, bytearray)) and (file_bytes_or_audiotmpfile is None or len(file_bytes_or_audiotmpfile) == 0 or os.path.getsize(file_bytes_or_audiotmpfile) == 0): raise ValueError(f"cannot read an empty/None file: '{file_bytes_or_audiotmpfile}'...") start0 = time.time() random_file_name = file_bytes_or_audiotmpfile app_logger.debug(f"random_file_name:{random_file_name} ...") if isinstance(file_bytes_or_audiotmpfile, (bytes, bytearray)): app_logger.debug("writing streaming data to file on disk...") with tempfile.NamedTemporaryFile(prefix="temp_sound_speech_score_", suffix=extension, delete=False) as f1: f1.write(file_bytes_or_audiotmpfile) duration = time.time() - start0 app_logger.info(f'Saved binary data in file in {duration}s.') random_file_name = f1.name start = time.time() app_logger.info(f'Loading {extension} file file {random_file_name} ...') try: signal, samplerate = soundfile_load(random_file_name) except LibsndfileError as sfe: # https://github.com/beetbox/audioread/issues/144 # deprecation warnings => pip install standard-aifc standard-sunau app_logger.error(f"Error reading file {random_file_name}: {sfe}, re-try with audioread...") try: signal, samplerate = audioread_load(random_file_name) except ModuleNotFoundError as mnfe: app_logger.error(f"Error reading file {random_file_name}: {mnfe}, try read https://github.com/beetbox/audioread/issues/144") raise mnfe duration = time.time() - start app_logger.info(f'Read {extension} file {random_file_name} in {duration}s.') signal_transformed = transform(torch.Tensor(signal)).unsqueeze(0) duration = time.time() - start app_logger.info(f'Loaded {extension} file {random_file_name} in {duration}s.') language_trainer_sst_lambda = trainer_SST_lambda[language] app_logger.info('language_trainer_sst_lambda: preparing...') result = language_trainer_sst_lambda.processAudioForGivenText(signal_transformed, real_text) app_logger.info(f'language_trainer_sst_lambda: result: {result}...') start = time.time() if remove_random_file: os.remove(random_file_name) duration = time.time() - start app_logger.info(f'Deleted file {random_file_name} in {duration}s.') start = time.time() real_transcripts_ipa = ' '.join( [word[0] for word in result['real_and_transcribed_words_ipa']]) matched_transcripts_ipa = ' '.join( [word[1] for word in result['real_and_transcribed_words_ipa']]) real_transcripts = ' '.join( [word[0] for word in result['real_and_transcribed_words']]) matched_transcripts = ' '.join( [word[1] for word in result['real_and_transcribed_words']]) words_real = real_transcripts.lower().split() mapped_words = matched_transcripts.split() is_letter_correct_all_words = '' for idx, word_real in enumerate(words_real): mapped_letters, _ = wm.get_best_mapped_words( mapped_words[idx], word_real ) is_letter_correct = wm.getWhichLettersWereTranscribedCorrectly( word_real, mapped_letters) # , mapped_letters_indices) is_letter_correct_all_words += ''.join([str(is_correct) for is_correct in is_letter_correct]) + ' ' pair_accuracy_category = ' '.join( [str(category) for category in result['pronunciation_categories']]) duration = time.time() - start duration_tot = time.time() - start0 app_logger.info(f'Time to post-process results: {duration}, tot_duration:{duration_tot}.') pronunciation_accuracy = float(result['pronunciation_accuracy']) ipa_transcript = result['recording_ipa'] return { 'real_transcript': result['recording_transcript'], 'ipa_transcript': ipa_transcript, 'pronunciation_accuracy': float(f"{pronunciation_accuracy:.2f}"), 'real_transcripts': real_transcripts, 'matched_transcripts': matched_transcripts, 'real_transcripts_ipa': real_transcripts_ipa, 'matched_transcripts_ipa': matched_transcripts_ipa, 'pair_accuracy_category': pair_accuracy_category, 'start_time': result['start_time'], 'end_time': result['end_time'], 'is_letter_correct_all_words': is_letter_correct_all_words } def get_speech_to_score_tuple(real_text: str, file_bytes_or_audiotmpfile: str | dict, language: str = "en", remove_random_file: bool = True): output = get_speech_to_score_dict(real_text=real_text, file_bytes_or_audiotmpfile=file_bytes_or_audiotmpfile, language=language, remove_random_file=remove_random_file) real_transcripts = output['real_transcripts'] is_letter_correct_all_words = output['is_letter_correct_all_words'] pronunciation_accuracy = output['pronunciation_accuracy'] ipa_transcript = output['ipa_transcript'] real_transcripts_ipa = output['real_transcripts_ipa'] end_time = [float(x) for x in output['end_time'].split(" ")] start_time = [float(x) for x in output['start_time'].split(" ")] num_words = len(end_time) app_logger.debug(f"start splitting recorded audio into {num_words} words...") audio_files, audio_durations = get_splitted_audio_file(audiotmpfile=file_bytes_or_audiotmpfile, start_time=start_time, end_time=end_time) output = {'audio_files': audio_files, "audio_durations": audio_durations, **output} first_audio_file = audio_files[0] return real_transcripts, is_letter_correct_all_words, pronunciation_accuracy, ipa_transcript, real_transcripts_ipa, num_words, first_audio_file, json.dumps(output) def soundfile_write(audiofile: str | Path, data: np.ndarray, samplerate: int): import soundfile as sf sf.write(audiofile, data, samplerate) def get_selected_word(idx_recorded_word: int, raw_json_output: str) -> tuple[str]: recognition_output = json.loads(raw_json_output) list_audio_files = recognition_output["audio_files"] real_transcripts = recognition_output["real_transcripts"] audio_durations = recognition_output["audio_durations"] real_transcripts_list = real_transcripts.split() app_logger.info(f"idx_recorded_word:{idx_recorded_word} ...") current_word = real_transcripts_list[idx_recorded_word] app_logger.info(f"current word:{current_word} ...") current_duration = audio_durations[idx_recorded_word] app_logger.info(f"current_duration:{current_duration} ...") return list_audio_files[idx_recorded_word], current_word, current_duration def get_splitted_audio_file(audiotmpfile: str | Path, start_time: list[float], end_time: list[float], signal: np.ndarray = None, samplerate: int = None) -> tuple[list[str], list[float]]: import soundfile as sf audio_files = [] if signal is not None: audiotmpfile = sf.SoundFile(signal, samplerate=samplerate) audio_durations = [] for n, (start_nth, end_nth) in enumerate(zip(start_time, end_time)): signal_nth, samplerate = soundfile_load(audiotmpfile, offset=start_nth, duration=end_nth - start_nth) audiofile = get_file_with_custom_suffix(audiotmpfile, f"_part{n}_start{start_nth}_end{end_nth}") soundfile_write(audiofile=audiofile, data=signal_nth, samplerate=samplerate) app_logger.info(f"audio file {audiofile} written...") audio_files.append(str(audiofile)) duration = end_nth - start_nth app_logger.info(f"audio file {audiofile} has duration {duration}...") audio_durations.append(duration) return audio_files, audio_durations def get_file_with_custom_suffix(basefile: str | Path, custom_suffix: str) -> list[str]: pathname = Path(basefile) dirname, filename_no_ext, filename_ext = pathname.parent, pathname.stem, pathname.suffix output_file = dirname / f"{filename_no_ext}_{custom_suffix}.{filename_ext}" return output_file # From Librosa def calc_start_end(sr_native, time_position, n_channels): return int(np.round(sr_native * time_position)) * n_channels def soundfile_load(path: str | Path, offset: float = 0.0, duration: float = None, dtype=np.float32): """Load an audio buffer using soundfile. Taken from librosa """ import soundfile as sf if isinstance(path, sf.SoundFile): # If the user passed an existing soundfile object, # we can use it directly context = path else: # Otherwise, create the soundfile object context = sf.SoundFile(path) with context as sf_desc: sr_native = sf_desc.samplerate if offset: # Seek to the start of the target read sf_desc.seek(int(offset * sr_native)) if duration is not None: frame_duration = int(duration * sr_native) else: frame_duration = -1 # Load the target number of frames, and transpose to match librosa form y = sf_desc.read(frames=frame_duration, dtype=dtype, always_2d=False).T return y, sr_native def audioread_load(path, offset=0.0, duration=None, dtype=np.float32): """Load an audio buffer using audioread. This loads one block at a time, and then concatenates the results. """ y = [] app_logger.debug(f"reading audio file at path:{path} ...") with audioread.audio_open(path) as input_file: sr_native = input_file.samplerate n_channels = input_file.channels s_start = calc_start_end(sr_native, offset, n_channels) if duration is None: s_end = np.inf else: duration = calc_start_end(sr_native, duration, n_channels) s_end = duration + s_start n = 0 for frame in input_file: frame = buf_to_float(frame, dtype=dtype) n_prev = n n = n + len(frame) if n < s_start: # offset is after the current frame # keep reading continue if s_end < n_prev: # we're off the end. stop reading break if s_end < n: # the end is in this frame. crop. frame = frame[: s_end - n_prev] if n_prev <= s_start <= n: # beginning is in this frame frame = frame[(s_start - n_prev):] # tack on the current frame y.append(frame) if y: y = np.concatenate(y) if n_channels > 1: y = y.reshape((-1, n_channels)).T else: y = np.empty(0, dtype=dtype) return y, sr_native # From Librosa def buf_to_float(x, n_bytes=2, dtype=np.float32): """Convert an integer buffer to floating point values. This is primarily useful when loading integer-valued wav data into numpy arrays. Parameters ---------- x : np.ndarray [dtype=int] The integer-valued data buffer n_bytes : int [1, 2, 4] The number of bytes per sample in ``x`` dtype : numeric type The target output type (default: 32-bit float) Returns ------- x_float : np.ndarray [dtype=float] The input data buffer cast to floating point """ # Invert the scale of the data scale = 1.0 / float(1 << ((8 * n_bytes) - 1)) # Construct the format string fmt = "