import base64 import json import os import tempfile import time import audioread import numpy as np import torch from torchaudio.transforms import Resample from aip_trainer import WordMatching as wm, app_logger from aip_trainer import pronunciationTrainer trainer_SST_lambda = { 'de': pronunciationTrainer.getTrainer("de"), 'en': pronunciationTrainer.getTrainer("en") } transform = Resample(orig_freq=48000, new_freq=16000) def lambda_handler(event, context): data = json.loads(event['body']) real_text = data['title'] file_bytes = base64.b64decode( data['base64Audio'][22:].encode('utf-8')) language = data['language'] if len(real_text) == 0: return { 'statusCode': 200, 'headers': { 'Access-Control-Allow-Headers': '*', 'Access-Control-Allow-Credentials': "true", 'Access-Control-Allow-Origin': 'http://127.0.0.1:3000/', 'Access-Control-Allow-Methods': 'OPTIONS,POST,GET' }, 'body': '' } start0 = time.time() with tempfile.NamedTemporaryFile(prefix="temp_sound_speech_score_", suffix=".ogg", delete=False) as f1: f1.write(file_bytes) duration = time.time() - start0 app_logger.info(f'Saved binary in file in {duration}s.') random_file_name = f1.name start = time.time() app_logger.info(f'Loading .ogg file file {random_file_name} ...') signal, _ = audioread_load(random_file_name) duration = time.time() - start app_logger.info(f'Read .ogg file {random_file_name} in {duration}s.') signal = transform(torch.Tensor(signal)).unsqueeze(0) duration = time.time() - start app_logger.info(f'Loaded .ogg file {random_file_name} in {duration}s.') language_trainer_sst_lambda = trainer_SST_lambda[language] app_logger.info('language_trainer_sst_lambda: preparing...') result = language_trainer_sst_lambda.processAudioForGivenText(signal, real_text) app_logger.info(f'language_trainer_sst_lambda: result: {result}...') start = time.time() os.remove(random_file_name) duration = time.time() - start app_logger.info(f'Deleted file {random_file_name} in {duration}s.') start = time.time() real_transcripts_ipa = ' '.join( [word[0] for word in result['real_and_transcribed_words_ipa']]) matched_transcripts_ipa = ' '.join( [word[1] for word in result['real_and_transcribed_words_ipa']]) real_transcripts = ' '.join( [word[0] for word in result['real_and_transcribed_words']]) matched_transcripts = ' '.join( [word[1] for word in result['real_and_transcribed_words']]) words_real = real_transcripts.lower().split() mapped_words = matched_transcripts.split() is_letter_correct_all_words = '' for idx, word_real in enumerate(words_real): mapped_letters, _ = wm.get_best_mapped_words( mapped_words[idx], word_real ) is_letter_correct = wm.getWhichLettersWereTranscribedCorrectly( word_real, mapped_letters) # , mapped_letters_indices) is_letter_correct_all_words += ''.join([str(is_correct) for is_correct in is_letter_correct]) + ' ' pair_accuracy_category = ' '.join( [str(category) for category in result['pronunciation_categories']]) duration = time.time() - start duration_tot = time.time() - start0 app_logger.info(f'Time to post-process results: {duration}, tot_duration:{duration_tot}.') res = {'real_transcript': result['recording_transcript'], 'ipa_transcript': result['recording_ipa'], 'pronunciation_accuracy': str(int(result['pronunciation_accuracy'])), 'real_transcripts': real_transcripts, 'matched_transcripts': matched_transcripts, 'real_transcripts_ipa': real_transcripts_ipa, 'matched_transcripts_ipa': matched_transcripts_ipa, 'pair_accuracy_category': pair_accuracy_category, 'start_time': result['start_time'], 'end_time': result['end_time'], 'is_letter_correct_all_words': is_letter_correct_all_words} return json.dumps(res) # From Librosa def calc_start_end(sr_native, time_position, n_channels): return int(np.round(sr_native * time_position)) * n_channels def audioread_load(path, offset=0.0, duration=None, dtype=np.float32): """Load an audio buffer using audioread. This loads one block at a time, and then concatenates the results. """ y = [] app_logger.debug(f"reading audio file at path:{path} ...") with audioread.audio_open(path) as input_file: sr_native = input_file.samplerate n_channels = input_file.channels s_start = calc_start_end(sr_native, offset, n_channels) if duration is None: s_end = np.inf else: duration = calc_start_end(sr_native, duration, n_channels) s_end = duration + s_start n = 0 for frame in input_file: frame = buf_to_float(frame, dtype=dtype) n_prev = n n = n + len(frame) if n < s_start: # offset is after the current frame # keep reading continue if s_end < n_prev: # we're off the end. stop reading break if s_end < n: # the end is in this frame. crop. frame = frame[: s_end - n_prev] if n_prev <= s_start <= n: # beginning is in this frame frame = frame[(s_start - n_prev):] # tack on the current frame y.append(frame) if y: y = np.concatenate(y) if n_channels > 1: y = y.reshape((-1, n_channels)).T else: y = np.empty(0, dtype=dtype) return y, sr_native # From Librosa def buf_to_float(x, n_bytes=2, dtype=np.float32): """Convert an integer buffer to floating point values. This is primarily useful when loading integer-valued wav data into numpy arrays. Parameters ---------- x : np.ndarray [dtype=int] The integer-valued data buffer n_bytes : int [1, 2, 4] The number of bytes per sample in ``x`` dtype : numeric type The target output type (default: 32-bit float) Returns ------- x_float : np.ndarray [dtype=float] The input data buffer cast to floating point """ # Invert the scale of the data scale = 1.0 / float(1 << ((8 * n_bytes) - 1)) # Construct the format string fmt = "