File size: 4,634 Bytes
17ff85c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1febb2e
 
17ff85c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1febb2e
17ff85c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1febb2e
17ff85c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
from fastapi import FastAPI, HTTPException, Response
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from audio_separator.separator import Separator
import ffmpeg
from datetime import datetime
import logging
import os
import uuid
from youtube_transcript_api import YouTubeTranscriptApi
import asyncio
from fastapi.concurrency import run_in_threadpool
from concurrent.futures import ThreadPoolExecutor

app = FastAPI()
tmp_directory = "tmp"
separator = Separator(output_dir=tmp_directory, log_level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
separator.load_model("UVR-MDX-NET-Inst_Main.onnx")

extractionExecuter = ThreadPoolExecutor(max_workers=8)
ffmpegExecuter = ThreadPoolExecutor(max_workers=8)


class IsolationRequest(BaseModel):
    url: str
    start_time: float
    duration_seconds: float


@app.post("/isolate")
async def isolate_voice(request: IsolationRequest):
    media_url = request.url
    start_seconds = request.start_time
    duration_seconds = request.duration_seconds
    try:
        extracted_audio_path = f"{tmp_directory}/{uuid.uuid4()}.wav"

        # TODO switch to CUDA
        await extract_audio(
            media_url, start_seconds, duration_seconds, extracted_audio_path
        )

        (
            primary_stem_output_path,
            secondary_stem_output_path,
        ) = await asyncio.get_event_loop().run_in_executor(
            extractionExecuter,
            separator.separate,
            extracted_audio_path,
        )

        with open(f"{tmp_directory}/{primary_stem_output_path}", "rb") as f:
            isolated_audio_data = f.read()

    except Exception as e:
        logging.error(f"An error occurred: {str(e)}")
        raise HTTPException(
            status_code=500, detail="An error occurred during vocal isolation"
        )

    finally:
        try:
            os.remove(extracted_audio_path)
            os.remove(f"{tmp_directory}/{primary_stem_output_path}")
            os.remove(f"{tmp_directory}/{secondary_stem_output_path}")
        except OSError as e:
            logging.warning(
                f"Error occurred while cleaning up temporary files: {str(e)}"
            )

    return Response(content=isolated_audio_data, media_type="audio/wav")


async def extract_audio(
    media_url: str, start_seconds: float, duration_seconds: float, output_path: str
):
    start_time = datetime.now()
    await asyncio.get_event_loop().run_in_executor(
        ffmpegExecuter,  # Uses the default executor
        lambda: ffmpeg.input(media_url, ss=start_seconds)
        .output(output_path, format="wav", t=duration_seconds)
        .global_args("-loglevel", "error", "-hide_banner")
        .global_args("-nostats")
        .run(),
    )

    end_time = datetime.now()
    logging.info(
        f"Audio extraction took {(end_time - start_time).total_seconds()} seconds"
    )


def scrape_subtitles(video_id, translate_to, translate_from):
    transcript_list = YouTubeTranscriptApi.list_transcripts(
        video_id,
    )

    # see if translation already exists
    try:
        return transcript_list.find_transcript([translate_to]).fetch()
    except:
        pass

    # find transcription in video language
    try:
        return (
            transcript_list.find_transcript([translate_from])
            .translate(translate_to)
            .fetch()
        )

    except:
        pass

    # search for any other translatable languages
    for transcript in transcript_list:
        try:
            return transcript.translate(translate_to).fetch()
        except:
            continue

    return None


def format_language_code(lang: str) -> str:
    mapping = {
        "he": "iw",
        "zh": "zh-Hans",
        "zh-TW": "zh-Hant",
    }
    return mapping.get(lang, lang.split("-")[0])


class SubtitleRequest(BaseModel):
    video_id: str
    translate_to: str
    translate_from: str


@app.post("/subtitles")
async def get_subtitles(request: SubtitleRequest):
    try:
        subtitles = await run_in_threadpool(
            scrape_subtitles,
            request.video_id,
            format_language_code(request.translate_to),
            format_language_code(request.translate_from),
        )
        if subtitles is None:
            return Response("Not available", 400)
        return JSONResponse(subtitles, 200)
    except Exception as e:
        logging.warn(e)
        raise HTTPException(
            status_code=500, detail="An error occurred while getting subtitles"
        )


# if __name__ == "__main__":
#     import uvicorn

#     uvicorn.run(app, host="0.0.0.0", port=8000)