from fastapi import FastAPI, HTTPException, Response from fastapi.responses import JSONResponse from pydantic import BaseModel from audio_separator.separator import Separator import ffmpeg from datetime import datetime import logging import os import uuid from youtube_transcript_api import YouTubeTranscriptApi import asyncio from fastapi.concurrency import run_in_threadpool from concurrent.futures import ThreadPoolExecutor app = FastAPI() tmp_directory = "tmp" separator = Separator(output_dir=tmp_directory, log_level=logging.INFO) logging.getLogger().setLevel(logging.INFO) separator.load_model("UVR-MDX-NET-Inst_Main.onnx") extractionExecuter = ThreadPoolExecutor(max_workers=8) ffmpegExecuter = ThreadPoolExecutor(max_workers=8) class IsolationRequest(BaseModel): url: str start_time: float duration_seconds: float @app.post("/isolate") async def isolate_voice(request: IsolationRequest): media_url = request.url start_seconds = request.start_time duration_seconds = request.duration_seconds try: extracted_audio_path = f"{tmp_directory}/{uuid.uuid4()}.wav" # TODO switch to CUDA await extract_audio( media_url, start_seconds, duration_seconds, extracted_audio_path ) ( primary_stem_output_path, secondary_stem_output_path, ) = await asyncio.get_event_loop().run_in_executor( extractionExecuter, separator.separate, extracted_audio_path, ) with open(f"{tmp_directory}/{primary_stem_output_path}", "rb") as f: isolated_audio_data = f.read() except Exception as e: logging.error(f"An error occurred: {str(e)}") raise HTTPException( status_code=500, detail="An error occurred during vocal isolation" ) finally: try: os.remove(extracted_audio_path) os.remove(f"{tmp_directory}/{primary_stem_output_path}") os.remove(f"{tmp_directory}/{secondary_stem_output_path}") except OSError as e: logging.warning( f"Error occurred while cleaning up temporary files: {str(e)}" ) return Response(content=isolated_audio_data, media_type="audio/wav") async def extract_audio( media_url: str, start_seconds: float, duration_seconds: float, output_path: str ): start_time = datetime.now() await asyncio.get_event_loop().run_in_executor( ffmpegExecuter, # Uses the default executor lambda: ffmpeg.input(media_url, ss=start_seconds) .output(output_path, format="wav", t=duration_seconds) .global_args("-loglevel", "error", "-hide_banner") .global_args("-nostats") .run(), ) end_time = datetime.now() logging.info( f"Audio extraction took {(end_time - start_time).total_seconds()} seconds" ) def scrape_subtitles(video_id, translate_to, translate_from): transcript_list = YouTubeTranscriptApi.list_transcripts( video_id, ) # see if translation already exists try: return transcript_list.find_transcript([translate_to]).fetch() except: pass # find transcription in video language try: return ( transcript_list.find_transcript([translate_from]) .translate(translate_to) .fetch() ) except: pass # search for any other translatable languages for transcript in transcript_list: try: return transcript.translate(translate_to).fetch() except: continue return None def format_language_code(lang: str) -> str: mapping = { "he": "iw", "zh": "zh-Hans", "zh-TW": "zh-Hant", } return mapping.get(lang, lang.split("-")[0]) class SubtitleRequest(BaseModel): video_id: str translate_to: str translate_from: str @app.post("/subtitles") async def get_subtitles(request: SubtitleRequest): try: subtitles = await run_in_threadpool( scrape_subtitles, request.video_id, format_language_code(request.translate_to), format_language_code(request.translate_from), ) if subtitles is None: return Response("Not available", 400) return JSONResponse(subtitles, 200) except Exception as e: logging.warn(e) raise HTTPException( status_code=500, detail="An error occurred while getting subtitles" ) # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000)