""" Copyright 2023 Balacoon contains implementation for Revoice request """ import os import asyncio import base64 import hashlib import json import ssl import time from typing import Tuple, Union import numpy as np import resampy import websockets def prepare_audio(audio: Tuple[int, np.ndarray]) -> np.ndarray: """ ensures that audio is in int16 format, 16khz mono """ sr, wav = audio # ensure proper type if wav.dtype == np.int32: max_val = np.max(np.abs(wav)) mult = (32767.0 / 2**31) if max_val > 32768 else 1.0 wav = (wav.astype(np.float32) * mult).astype(np.int16) elif wav.dtype == np.float32 or wav.dtype == np.float64: mult = 32767.0 if np.max(np.abs(wav)) <= 1.0 else 1.0 wav = (wav * mult).astype(np.int16) if wav.ndim == 2: # average channels if wav.shape[0] == 2: wav = np.mean(wav, axis=0, keepdims=False) if wav.shape[1] == 2: wav = np.mean(wav, axis=1, keepdims=False) if wav.ndim != 1: return None # ensure proper sampling rate if sr != 16000: wav = (wav / 32768.0).astype(np.float) wav = resampy.resample(wav, sr, 16000) wav = (wav * 32768.0).astype(np.int16) return wav def create_signature(api_secret: str) -> str: """ helper function that creates signature, required to authentificate the request """ int_time = int(time.time() / 1000) signature_input = (api_secret + str(int_time)).encode() signature = hashlib.sha256(signature_input).hexdigest() return signature async def async_service_request(source_str: str, source: np.ndarray, target: np.ndarray, api_key: str, api_secret: str) -> np.ndarray: if target is None or len(target) == 0: return None ssl_context = ssl.create_default_context() async with websockets.connect( os.environ["endpoint"], close_timeout=1024, ssl=ssl_context ) as websocket: request_dict = { "target": base64.b64encode(target.tobytes()).decode("utf-8"), "api_key": api_key, "signature": create_signature(api_secret), } if source_str and len(source_str) > 0: request_dict["source_str"] = source_str elif source is not None and len(source) > 0: request_dict["source"] = base64.b64encode(source.tobytes()).decode("utf-8") else: return None request = json.dumps(request_dict) await websocket.send(request) # read reply result_lst = [] while True: try: data = await asyncio.wait_for(websocket.recv(), timeout=30) result_lst.append(np.frombuffer(data, dtype="int16")) except websockets.exceptions.ConnectionClosed: break except asyncio.TimeoutError: break if data is None: break result = np.concatenate(result_lst) if result_lst else None return result def service_request( source_str: str, source_audio: Tuple[int, np.ndarray], target_audio: Tuple[int, np.ndarray], api_key: str, api_secret: str, ) -> Tuple[int, np.ndarray]: """ prepares audio (has to be 16khz mono) and runs request to a voice conversion service """ src = None if source_audio is not None: src = prepare_audio(source_audio) tgt = prepare_audio(target_audio) if tgt is None: return if source_str is None and src is None: return if len(tgt) >= 30 * 16000: # too long return if src is not None and len(src) >= 60 * 16000: return if source_str is not None and len(source_str) > 256: return res = asyncio.run(async_service_request(source_str, src, tgt, api_key, api_secret)) return 16000, res