File size: 14,099 Bytes
93dccb8 0528943 93dccb8 7287be3 93dccb8 f2203e4 a2b2691 93dccb8 90529aa 93dccb8 0528943 93dccb8 dfd3ba0 93dccb8 7287be3 18f5257 9963a4c 7bb8d49 6757c8a 8dd2969 7287be3 6efc26b 7287be3 0f23ac3 5abdaae 66a7997 5abdaae 385afe8 93dccb8 5abdaae 93dccb8 c31210a 385afe8 93dccb8 3c79d0b 5d2147d 3c79d0b 93dccb8 385afe8 c31210a 94a5a0e ba029ff 93dccb8 7287be3 6eaa917 a810c3a 7287be3 dc07bdb dc175eb 17f1075 dc175eb b2338e0 dc175eb 17f1075 dc175eb 17f1075 dc175eb 80f3afb dc175eb 17f1075 dfd3ba0 b2338e0 dfd3ba0 d85b988 9b3f39b dfd3ba0 9b3f39b 11c344a d85b988 a810c3a d85b988 83369b5 dc07bdb 83369b5 5d2147d 6757c8a 83369b5 9b3f39b 7287be3 a2e8828 159cf74 d762bd0 a2e8828 8dd2969 504d610 9fc2310 0d156b5 a2e8828 159cf74 a2e8828 504d610 9fc2310 869d9da 504d610 a2e8828 9fc2310 a2e8828 8dd2969 a2e8828 543ebf8 8dd2969 77bcddf a2e8828 159cf74 763f926 a2e8828 159cf74 31d5632 a2e8828 6757c8a a2e8828 8645b1a b2d8ac4 6548bd3 bd902f8 6548bd3 3e0f889 42c98ad 3e0f889 6548bd3 3e0f889 504d610 3e0f889 6c107e7 bd902f8 3e0f889 8645b1a 6548bd3 8645b1a 6548bd3 8645b1a 6548bd3 bd902f8 6548bd3 bd902f8 b4c6121 8645b1a b4c6121 6548bd3 bd902f8 0320aa5 b4c6121 6548bd3 bd902f8 6757c8a bd902f8 86916d5 66cc730 86916d5 056074f 86916d5 056074f 86916d5 6757c8a 86916d5 8645b1a 6eaa917 7287be3 a478a5a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 |
import sys
import time
from fastapi import FastAPI, BackgroundTasks, Request, HTTPException
from fastapi.responses import FileResponse
from fastapi.concurrency import run_in_threadpool
import yt_dlp
import ffmpeg
import urllib.parse
import os
from datetime import datetime
import schedule
import requests
import uvicorn
import subprocess
import json
from dotenv import load_dotenv
import mimetypes
import tempfile
from PIL import Image
from io import BytesIO
from pathlib import Path
from fastapi.staticfiles import StaticFiles
from starlette.concurrency import run_in_threadpool
from starlette.responses import JSONResponse
import logging
import gc
import re
tmp_dir = tempfile.gettempdir()
BASE_URL = "https://chrunos-prem.hf.space"
load_dotenv()
app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def env_to_cookies(env_content: str, output_file: str) -> None:
"""Convert environment variable content back to cookie file"""
try:
# Extract content from env format
if '="' not in env_content:
raise ValueError("Invalid env content format")
content = env_content.split('="', 1)[1].strip('"')
# Replace escaped newlines with actual newlines
cookie_content = content.replace('\\n', '\n')
# Write to cookie file
with open(output_file, 'w') as f:
f.write(cookie_content)
except Exception as e:
raise ValueError(f"Error converting to cookie file: {str(e)}")
def save_to_env_file(env_content: str, env_file: str = '.env') -> None:
"""Save environment variable content to .env file"""
try:
with open(env_file, 'w') as f:
f.write(env_content)
#print(f"Successfully saved to {env_file}")
except Exception as e:
raise ValueError(f"Error saving to env file: {str(e)}")
def env_to_cookies_from_env(output_file: str) -> None:
"""Convert environment variable from .env file to cookie file"""
try:
load_dotenv() # Load from .env file
env_content = os.getenv('FIREFOX_COOKIES')
#print(f"Printing env content: \n{env_content}")
if not env_content:
raise ValueError("FIREFOX_COOKIES not found in .env file")
env_to_cookies(f'FIREFOX_COOKIES="{env_content}"', output_file)
except Exception as e:
raise ValueError(f"Error converting to cookie file: {str(e)}")
def get_cookies():
"""Get cookies from environment variable"""
load_dotenv()
cookie_content = os.getenv('FIREFOX_COOKIES')
#print(cookie_content)
if not cookie_content:
raise ValueError("FIREFOX_COOKIES environment variable not set")
return cookie_content
def create_temp_cookie_file():
"""Create temporary cookie file from environment variable"""
temp_cookie = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt')
try:
cookie_content = get_cookies()
# Replace escaped newlines with actual newlines
cookie_content = cookie_content.replace('\\n', '\n')
temp_cookie.write()
temp_cookie.flush()
return Path(temp_cookie.name)
finally:
temp_cookie.close()
@app.get('/')
def main():
return "Chrunos Downloader API Is Running."
@app.get("/get_video_url")
async def get_video_url(youtube_url: str):
try:
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
# Add cookies
ydl_opts = {
'format': 'best',
'quiet': True,
#'outtmpl': f'{VIDEO_DIR}/%(id)s.%(ext)s',
'max_filesize': 50 * 1024 * 1024
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(youtube_url, download=False)
return info
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Define a global temporary download directory
global_download_dir = tempfile.mkdtemp()
@app.post("/high")
async def download_high_video(request: Request):
data = await request.json()
video_url = data.get('url')
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None
quality = data.get('quality', '1080')
logger.info(f'input: {video_url} {quality}')
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
# 转换质量字符串为数值
height_map = {
'480': 480,
'720': 720,
'1080': 1080,
'1440': 1440,
'2160': 2160
}
max_height = height_map.get(quality, 1080)
# 为高分辨率视频(>1080p)不限制编码器,对较低分辨率优先选择H.264
if max_height > 1080:
# 支持横屏和竖屏视频 - 对于更高分辨率不限制编码器
format_str = f'bestvideo[height<={max_height}]/bestvideo[width<={max_height}]+bestaudio/best'
else:
# 优先选择H.264编码,同时支持横屏和竖屏(长边不超过指定分辨率的1.8倍)
long_edge = int(max_height * 1.8) # 假设长边与短边比例约为16:9
format_str = f'bestvideo[vcodec^=avc][height<={long_edge}][width<={long_edge}]+bestaudio/best'
ydl_opts = {
'format': format_str,
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
if is_youtube_url:
ydl_opts["cookiefile"] = "firefox-cookies.txt"
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url]))
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url}
''' try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
available_formats = info.get('formats', [])
# 过滤出视频格式(排除音频和故事板)
video_formats = [f for f in available_formats if 'height' in f and f['height'] is not None]
# 根据高度排序
video_formats.sort(key=lambda x: x['height'], reverse=True)
selected_format = None
for fmt in video_formats:
if fmt['height'] <= max_height:
selected_format = fmt['format_id']
break
if not selected_format:
# 如果没有合适的格式,选择最大的格式
if video_formats:
selected_format = video_formats[0]['format_id']
else:
return {"error": "No suitable video format available"}
ydl_opts['format'] = f'{selected_format}+bestaudio/best'
await run_in_threadpool(lambda: ydl.download([video_url]))
except yt_dlp.DownloadError as e:
return {"error": f"Download failed: {str(e)}"}
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url}
'''
@app.post("/max")
async def download_high_quality_video(request: Request):
data = await request.json()
video_url = data.get('url')
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None
# Get quality from request, default to 1080p if not specified
quality = data.get('quality', '1080') # Can be '480', '720', '1080', '1440', '2160' (4K)
logger.info(f'input: {video_url} {quality}')
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
# Convert quality string to height
height_map = {
'480': 480,
'720': 720,
'1080': 1080,
'1440': 1440,
'2160': 2160
}
max_height = height_map.get(quality, 1080)
if max_height > 1080:
format_str = f'bestvideo[height<={max_height}]+bestaudio/best'
else:
format_str = f'bestvideo[height<={max_height}][vcodec^=h264]+bestaudio/best'
ydl_opts = {
'format': format_str,
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
if is_youtube_url:
ydl_opts["cookiefile"] = "firefox-cookies.txt"
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url]))
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url}
@app.post("/audio")
async def download_audio(request: Request):
data = await request.json()
video_url = data.get('url')
logging.info(f"Received URL: {video_url}")
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'cookiefile': cookiefile,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}]
}
try:
logging.info("Starting yt-dlp download...")
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url]))
logging.info("yt-dlp download completed")
except yt_dlp.utils.DownloadError as e:
error_message = str(e)
logging.error(f"yt-dlp error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
except Exception as e:
error_message = str(e)
logging.error(f"General error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3"))
if not downloaded_files:
logging.error("Download failed: No MP3 files found")
return JSONResponse(content={"error": "Download failed"}, status_code=500)
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
logging.info(f"Download URL: {download_url}")
# Log just before returning the response
logging.info("Preparing to send response back to the client")
gc.collect()
return JSONResponse(content={"url": download_url}, status_code=200)
@app.post("/search")
async def search_and_download_song(request: Request):
data = await request.json()
song_name = data.get('songname')
artist_name = data.get('artist')
if artist_name:
search_query = f"ytsearch:{song_name} {artist_name}"
else:
search_query = f"ytsearch:{song_name}"
logging.info(f"Search query: {search_query}")
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}],
'cookiefile': cookiefile
}
try:
logging.info("Starting yt-dlp search and download...")
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([search_query]))
logging.info("yt-dlp search and download completed")
except yt_dlp.utils.DownloadError as e:
error_message = str(e)
logging.error(f"yt-dlp error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
except Exception as e:
error_message = str(e)
logging.error(f"General error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3"))
if not downloaded_files:
logging.error("Download failed: No MP3 files found")
return JSONResponse(content={"error": "Download failed"}, status_code=500)
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
logging.info(f"Download URL: {download_url}")
# Log just before returning the response
logging.info("Preparing to send response back to the client")
gc.collect()
return JSONResponse(content={"url": download_url}, status_code=200)
# Mount the static files directory
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads")
|