audio-separator / main.py
rcastriotta's picture
Update main.py
1febb2e verified
raw
history blame contribute delete
No virus
4.63 kB
from fastapi import FastAPI, HTTPException, Response
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from audio_separator.separator import Separator
import ffmpeg
from datetime import datetime
import logging
import os
import uuid
from youtube_transcript_api import YouTubeTranscriptApi
import asyncio
from fastapi.concurrency import run_in_threadpool
from concurrent.futures import ThreadPoolExecutor
app = FastAPI()
tmp_directory = "tmp"
separator = Separator(output_dir=tmp_directory, log_level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
separator.load_model("UVR-MDX-NET-Inst_Main.onnx")
extractionExecuter = ThreadPoolExecutor(max_workers=8)
ffmpegExecuter = ThreadPoolExecutor(max_workers=8)
class IsolationRequest(BaseModel):
url: str
start_time: float
duration_seconds: float
@app.post("/isolate")
async def isolate_voice(request: IsolationRequest):
media_url = request.url
start_seconds = request.start_time
duration_seconds = request.duration_seconds
try:
extracted_audio_path = f"{tmp_directory}/{uuid.uuid4()}.wav"
# TODO switch to CUDA
await extract_audio(
media_url, start_seconds, duration_seconds, extracted_audio_path
)
(
primary_stem_output_path,
secondary_stem_output_path,
) = await asyncio.get_event_loop().run_in_executor(
extractionExecuter,
separator.separate,
extracted_audio_path,
)
with open(f"{tmp_directory}/{primary_stem_output_path}", "rb") as f:
isolated_audio_data = f.read()
except Exception as e:
logging.error(f"An error occurred: {str(e)}")
raise HTTPException(
status_code=500, detail="An error occurred during vocal isolation"
)
finally:
try:
os.remove(extracted_audio_path)
os.remove(f"{tmp_directory}/{primary_stem_output_path}")
os.remove(f"{tmp_directory}/{secondary_stem_output_path}")
except OSError as e:
logging.warning(
f"Error occurred while cleaning up temporary files: {str(e)}"
)
return Response(content=isolated_audio_data, media_type="audio/wav")
async def extract_audio(
media_url: str, start_seconds: float, duration_seconds: float, output_path: str
):
start_time = datetime.now()
await asyncio.get_event_loop().run_in_executor(
ffmpegExecuter, # Uses the default executor
lambda: ffmpeg.input(media_url, ss=start_seconds)
.output(output_path, format="wav", t=duration_seconds)
.global_args("-loglevel", "error", "-hide_banner")
.global_args("-nostats")
.run(),
)
end_time = datetime.now()
logging.info(
f"Audio extraction took {(end_time - start_time).total_seconds()} seconds"
)
def scrape_subtitles(video_id, translate_to, translate_from):
transcript_list = YouTubeTranscriptApi.list_transcripts(
video_id,
)
# see if translation already exists
try:
return transcript_list.find_transcript([translate_to]).fetch()
except:
pass
# find transcription in video language
try:
return (
transcript_list.find_transcript([translate_from])
.translate(translate_to)
.fetch()
)
except:
pass
# search for any other translatable languages
for transcript in transcript_list:
try:
return transcript.translate(translate_to).fetch()
except:
continue
return None
def format_language_code(lang: str) -> str:
mapping = {
"he": "iw",
"zh": "zh-Hans",
"zh-TW": "zh-Hant",
}
return mapping.get(lang, lang.split("-")[0])
class SubtitleRequest(BaseModel):
video_id: str
translate_to: str
translate_from: str
@app.post("/subtitles")
async def get_subtitles(request: SubtitleRequest):
try:
subtitles = await run_in_threadpool(
scrape_subtitles,
request.video_id,
format_language_code(request.translate_to),
format_language_code(request.translate_from),
)
if subtitles is None:
return Response("Not available", 400)
return JSONResponse(subtitles, 200)
except Exception as e:
logging.warn(e)
raise HTTPException(
status_code=500, detail="An error occurred while getting subtitles"
)
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)