Rectifier / App /Generate /database /DescriptAPI.py
Mbonea's picture
retries if it fails
2bec257
raw
history blame
7.87 kB
import asyncio
import aiohttp
import os, uuid
from collections import deque
import wave
import uuid
from pydub import AudioSegment
import wave
import struct
def concatenate_wave_files(input_file_paths, output_file_path):
"""
Concatenates multiple wave files and saves the result to a new file.
:param input_file_paths: A list of paths to the input wave files.
:param output_file_path: The path to the output wave file.
"""
# Check if input file paths are provided
if not input_file_paths:
raise ValueError("No input file paths provided.")
# Validate output file path
if not output_file_path:
raise ValueError("Output file path is empty.")
# Validate input file paths
for input_file_path in input_file_paths:
if not input_file_path:
raise ValueError("Empty input file path found.")
# Validate and get parameters from the first input file
with wave.open(input_file_paths[0], "rb") as input_file:
n_channels = input_file.getnchannels()
sampwidth = input_file.getsampwidth()
framerate = input_file.getframerate()
comptype = input_file.getcomptype()
compname = input_file.getcompname()
# Open the output file for writing
output_file = wave.open(output_file_path, "wb")
output_file.setnchannels(n_channels)
output_file.setsampwidth(sampwidth)
output_file.setframerate(framerate)
output_file.setcomptype(comptype, compname)
# Concatenate and write data from all input files to the output file
for input_file_path in input_file_paths:
with wave.open(input_file_path, "rb") as input_file:
output_file.writeframes(input_file.readframes(input_file.getnframes()))
# Close the output file
output_file.close()
print(
f"Successfully concatenated {len(input_file_paths)} files into {output_file_path}"
)
# # Example usage
# input_files = ["./tmp/" + i for i in os.listdir("./tmp")]
# output_file = "./concatenated_output.wav"
# concatenate_wave_files(input_files, output_file)
def concatenate_wav_files(input_files, file_directory):
print(input_files)
output_file = file_directory + str(uuid.uuid4()) + "final.wav"
# Initialize variables for output file
output = None
output_params = None
try:
# Open output file for writing
output = wave.open(output_file, "wb")
# Loop through input files
for input_file in input_files:
with wave.open(input_file, "rb") as input_wav:
# If this is the first input file, set output file parameters
if output_params is None:
output_params = input_wav.getparams()
output.setparams(output_params)
# Otherwise, ensure consistency of parameters
else:
pass
# if input_wav.getparams() != output_params:
# raise ValueError(
# "Input file parameters do not match output file parameters."
# )
# Read data from input file and write to output file
output.writeframes(input_wav.readframes(input_wav.getnframes()))
finally:
# Close output file
if output is not None:
output.close()
return (output_file,)
class Speak:
def __init__(self, api_url="https://yakova-embedding.hf.space", dir="./tmp"):
self.api_url = api_url
self.dir = dir
async def _make_transcript(self, links, text):
data = {"audio_url": links, "text": text}
tries = 0
max_retries = 10
while tries < max_retries:
try:
response_data = await self._make_request(
"post", "descript_transcript", json=data
)
if isinstance(response_data, dict):
return response_data
except Exception as e:
print(f"Attempt {tries + 1} failed: {e}")
tries += 1
await asyncio.sleep(1) # Adding a delay between retries
raise Exception("Max retries reached. Unable to get a valid response.")
async def _make_request(self, method, endpoint, json=None):
async with aiohttp.ClientSession() as session:
async with getattr(session, method)(
f"{self.api_url}/{endpoint}", json=json
) as response:
return await response.json()
async def say(self, text, speaker="Gabi"):
data = {"text": text, "speaker": speaker}
response_data = await self._make_request("post", "descript_tts", json=data)
tts_id = response_data["id"]
# Poll the status endpoint until the TTS is ready
while True:
status_data = await self._make_request(
"post", "descript_status", json={"id": tts_id}
)
print(status_data)
if "status" in status_data:
if status_data["status"] == "done":
audio_url = status_data["url"]
temp = await self.download_file(audio_url)
return audio_url, temp
else:
pass
await asyncio.sleep(1)
async def download_file(self, url):
filename = str(uuid.uuid4()) + ".wav"
os.makedirs(self.dir, exist_ok=True)
save_path = os.path.join(self.dir, filename)
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
with open(save_path, "wb") as file:
while True:
chunk = await response.content.read(1024)
if not chunk:
break
file.write(chunk)
return save_path
async def download_file_with_aria2c(self, url):
filename = str(uuid.uuid4()) + ".wav"
os.makedirs(self.dir, exist_ok=True)
save_path = os.path.join(self.dir, filename)
print(url)
command = f"aria2c {url} -o {save_path}"
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
print(f"File downloaded successfully to {save_path}")
else:
print(f"Failed to download file. Error: {stderr.decode()}")
return save_path
async def process_narrations(narrations):
speak = Speak()
tasks = deque()
results = []
files = []
async def process_task():
while tasks:
text = tasks.popleft()
result = await speak.say(text)
_, temp = result
results.append(result)
files.append(temp)
for narration in narrations:
tasks.append(narration)
if len(tasks) >= 2:
await asyncio.gather(*[process_task() for _ in range(2)])
# Process remaining tasks
await asyncio.gather(*[process_task() for _ in range(len(tasks))])
concatinated_file = concatenate_wav_files(files, speak.dir)
audio_file = AudioSegment.from_file(concatinated_file, format="wav")
duration_in_seconds = int(len(audio_file) / 1000)
return results, [concatinated_file, duration_in_seconds]
# duration = 0.23529411764705882
# # Example narrations
# narrations = [
# "Hello there ",
# "Hello there",
# "Hello there",
# "Hello there [space]",
# ]
# # Run the asyncio event loop
# async def main():
# results = await process_narrations(narrations)
# print("Results:", results)
# asyncio.run(main())