|
import sys |
|
import time |
|
from fastapi import FastAPI, BackgroundTasks, Request, HTTPException |
|
from fastapi.responses import FileResponse |
|
from fastapi.concurrency import run_in_threadpool |
|
import yt_dlp |
|
import ffmpeg |
|
import urllib.parse |
|
import os |
|
from datetime import datetime |
|
import schedule |
|
import requests |
|
import uvicorn |
|
import subprocess |
|
import json |
|
from dotenv import load_dotenv |
|
import mimetypes |
|
import tempfile |
|
from PIL import Image |
|
from io import BytesIO |
|
from pathlib import Path |
|
from fastapi.staticfiles import StaticFiles |
|
from starlette.concurrency import run_in_threadpool |
|
from starlette.responses import JSONResponse |
|
import logging |
|
import gc |
|
import re |
|
|
|
tmp_dir = tempfile.gettempdir() |
|
BASE_URL = "https://chrunos-prem.hf.space" |
|
|
|
|
|
load_dotenv() |
|
app = FastAPI() |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
def env_to_cookies(env_content: str, output_file: str) -> None: |
|
"""Convert environment variable content back to cookie file""" |
|
try: |
|
|
|
if '="' not in env_content: |
|
raise ValueError("Invalid env content format") |
|
|
|
content = env_content.split('="', 1)[1].strip('"') |
|
|
|
|
|
cookie_content = content.replace('\\n', '\n') |
|
|
|
|
|
with open(output_file, 'w') as f: |
|
f.write(cookie_content) |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error converting to cookie file: {str(e)}") |
|
|
|
def save_to_env_file(env_content: str, env_file: str = '.env') -> None: |
|
"""Save environment variable content to .env file""" |
|
try: |
|
with open(env_file, 'w') as f: |
|
f.write(env_content) |
|
|
|
except Exception as e: |
|
raise ValueError(f"Error saving to env file: {str(e)}") |
|
|
|
def env_to_cookies_from_env(output_file: str) -> None: |
|
"""Convert environment variable from .env file to cookie file""" |
|
try: |
|
load_dotenv() |
|
env_content = os.getenv('FIREFOX_COOKIES') |
|
|
|
if not env_content: |
|
raise ValueError("FIREFOX_COOKIES not found in .env file") |
|
|
|
env_to_cookies(f'FIREFOX_COOKIES="{env_content}"', output_file) |
|
except Exception as e: |
|
raise ValueError(f"Error converting to cookie file: {str(e)}") |
|
|
|
def get_cookies(): |
|
"""Get cookies from environment variable""" |
|
load_dotenv() |
|
cookie_content = os.getenv('FIREFOX_COOKIES') |
|
|
|
if not cookie_content: |
|
raise ValueError("FIREFOX_COOKIES environment variable not set") |
|
return cookie_content |
|
|
|
def create_temp_cookie_file(): |
|
"""Create temporary cookie file from environment variable""" |
|
temp_cookie = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt') |
|
try: |
|
cookie_content = get_cookies() |
|
|
|
cookie_content = cookie_content.replace('\\n', '\n') |
|
temp_cookie.write() |
|
temp_cookie.flush() |
|
return Path(temp_cookie.name) |
|
finally: |
|
temp_cookie.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get('/') |
|
def main(): |
|
return "Chrunos Downloader API Is Running." |
|
|
|
@app.get("/get_video_url") |
|
async def get_video_url(youtube_url: str): |
|
try: |
|
cookiefile = "firefox-cookies.txt" |
|
env_to_cookies_from_env("firefox-cookies.txt") |
|
|
|
|
|
ydl_opts = { |
|
'format': 'best', |
|
'quiet': True, |
|
|
|
'max_filesize': 50 * 1024 * 1024 |
|
} |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(youtube_url, download=False) |
|
return info |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
|
|
global_download_dir = tempfile.mkdtemp() |
|
|
|
@app.post("/high") |
|
async def download_high_video(request: Request): |
|
data = await request.json() |
|
video_url = data.get('url') |
|
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None |
|
quality = data.get('quality', '1080') |
|
logger.info(f'input: {video_url} {quality}') |
|
cookiefile = "firefox-cookies.txt" |
|
env_to_cookies_from_env("firefox-cookies.txt") |
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') |
|
|
|
|
|
height_map = { |
|
'480': 480, |
|
'720': 720, |
|
'1080': 1080, |
|
'1440': 1440, |
|
'2160': 2160 |
|
} |
|
max_height = height_map.get(quality, 1080) |
|
|
|
|
|
if max_height > 1080: |
|
|
|
format_str = f'bestvideo[height<={max_height}]/bestvideo[width<={max_height}]+bestaudio/best' |
|
else: |
|
|
|
long_edge = int(max_height * 1.8) |
|
format_str = f'bestvideo[vcodec^=avc][height<={long_edge}][width<={long_edge}]+bestaudio/best' |
|
|
|
ydl_opts = { |
|
'format': format_str, |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
'merge_output_format': 'mp4' |
|
} |
|
|
|
if is_youtube_url: |
|
ydl_opts["cookiefile"] = "firefox-cookies.txt" |
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
|
if not downloaded_files: |
|
return {"error": "Download failed"} |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
gc.collect() |
|
return {"url": download_url} |
|
|
|
''' try: |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info = ydl.extract_info(video_url, download=False) |
|
available_formats = info.get('formats', []) |
|
|
|
# 过滤出视频格式(排除音频和故事板) |
|
video_formats = [f for f in available_formats if 'height' in f and f['height'] is not None] |
|
# 根据高度排序 |
|
video_formats.sort(key=lambda x: x['height'], reverse=True) |
|
|
|
selected_format = None |
|
for fmt in video_formats: |
|
if fmt['height'] <= max_height: |
|
selected_format = fmt['format_id'] |
|
break |
|
|
|
if not selected_format: |
|
# 如果没有合适的格式,选择最大的格式 |
|
if video_formats: |
|
selected_format = video_formats[0]['format_id'] |
|
else: |
|
return {"error": "No suitable video format available"} |
|
|
|
ydl_opts['format'] = f'{selected_format}+bestaudio/best' |
|
await run_in_threadpool(lambda: ydl.download([video_url])) |
|
|
|
except yt_dlp.DownloadError as e: |
|
return {"error": f"Download failed: {str(e)}"} |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
|
if not downloaded_files: |
|
return {"error": "Download failed"} |
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
gc.collect() |
|
return {"url": download_url} |
|
''' |
|
|
|
|
|
|
|
|
|
@app.post("/max") |
|
async def download_high_quality_video(request: Request): |
|
data = await request.json() |
|
video_url = data.get('url') |
|
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None |
|
|
|
quality = data.get('quality', '1080') |
|
logger.info(f'input: {video_url} {quality}') |
|
cookiefile = "firefox-cookies.txt" |
|
env_to_cookies_from_env("firefox-cookies.txt") |
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') |
|
|
|
|
|
height_map = { |
|
'480': 480, |
|
'720': 720, |
|
'1080': 1080, |
|
'1440': 1440, |
|
'2160': 2160 |
|
} |
|
max_height = height_map.get(quality, 1080) |
|
if max_height > 1080: |
|
format_str = f'bestvideo[height<={max_height}]+bestaudio/best' |
|
else: |
|
format_str = f'bestvideo[height<={max_height}][vcodec^=h264]+bestaudio/best' |
|
|
|
ydl_opts = { |
|
'format': format_str, |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
'merge_output_format': 'mp4' |
|
} |
|
|
|
if is_youtube_url: |
|
ydl_opts["cookiefile"] = "firefox-cookies.txt" |
|
|
|
|
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4")) |
|
if not downloaded_files: |
|
return {"error": "Download failed"} |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
gc.collect() |
|
return {"url": download_url} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/audio") |
|
async def download_audio(request: Request): |
|
data = await request.json() |
|
video_url = data.get('url') |
|
logging.info(f"Received URL: {video_url}") |
|
cookiefile = "firefox-cookies.txt" |
|
env_to_cookies_from_env("firefox-cookies.txt") |
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') |
|
|
|
ydl_opts = { |
|
'format': 'bestaudio/best', |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
'cookiefile': cookiefile, |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'mp3', |
|
'preferredquality': '192' |
|
}] |
|
} |
|
|
|
try: |
|
logging.info("Starting yt-dlp download...") |
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url])) |
|
logging.info("yt-dlp download completed") |
|
except yt_dlp.utils.DownloadError as e: |
|
error_message = str(e) |
|
logging.error(f"yt-dlp error: {error_message}") |
|
return JSONResponse(content={"error": error_message}, status_code=500) |
|
except Exception as e: |
|
error_message = str(e) |
|
logging.error(f"General error: {error_message}") |
|
return JSONResponse(content={"error": error_message}, status_code=500) |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3")) |
|
if not downloaded_files: |
|
logging.error("Download failed: No MP3 files found") |
|
return JSONResponse(content={"error": "Download failed"}, status_code=500) |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
logging.info(f"Download URL: {download_url}") |
|
|
|
|
|
logging.info("Preparing to send response back to the client") |
|
gc.collect() |
|
return JSONResponse(content={"url": download_url}, status_code=200) |
|
|
|
@app.post("/search") |
|
async def search_and_download_song(request: Request): |
|
data = await request.json() |
|
song_name = data.get('songname') |
|
artist_name = data.get('artist') |
|
if artist_name: |
|
search_query = f"ytsearch:{song_name} {artist_name}" |
|
else: |
|
search_query = f"ytsearch:{song_name}" |
|
|
|
logging.info(f"Search query: {search_query}") |
|
cookiefile = "firefox-cookies.txt" |
|
env_to_cookies_from_env("firefox-cookies.txt") |
|
|
|
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') |
|
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s') |
|
|
|
ydl_opts = { |
|
'format': 'bestaudio/best', |
|
'outtmpl': output_template, |
|
'quiet': True, |
|
'no_warnings': True, |
|
'noprogress': True, |
|
'postprocessors': [{ |
|
'key': 'FFmpegExtractAudio', |
|
'preferredcodec': 'mp3', |
|
'preferredquality': '192' |
|
}], |
|
'cookiefile': cookiefile |
|
} |
|
|
|
try: |
|
logging.info("Starting yt-dlp search and download...") |
|
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([search_query])) |
|
logging.info("yt-dlp search and download completed") |
|
except yt_dlp.utils.DownloadError as e: |
|
error_message = str(e) |
|
logging.error(f"yt-dlp error: {error_message}") |
|
return JSONResponse(content={"error": error_message}, status_code=500) |
|
except Exception as e: |
|
error_message = str(e) |
|
logging.error(f"General error: {error_message}") |
|
return JSONResponse(content={"error": error_message}, status_code=500) |
|
|
|
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3")) |
|
if not downloaded_files: |
|
logging.error("Download failed: No MP3 files found") |
|
return JSONResponse(content={"error": "Download failed"}, status_code=500) |
|
|
|
downloaded_file = downloaded_files[0] |
|
encoded_filename = urllib.parse.quote(downloaded_file.name) |
|
download_url = f"{BASE_URL}/file/{encoded_filename}" |
|
logging.info(f"Download URL: {download_url}") |
|
|
|
|
|
logging.info("Preparing to send response back to the client") |
|
gc.collect() |
|
return JSONResponse(content={"url": download_url}, status_code=200) |
|
|
|
|
|
|
|
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads") |
|
|
|
|
|
|