prem / app.py
Chrunos's picture
Update app.py
6efc26b verified
import sys
import time
from fastapi import FastAPI, BackgroundTasks, Request, HTTPException
from fastapi.responses import FileResponse
from fastapi.concurrency import run_in_threadpool
import yt_dlp
import ffmpeg
import urllib.parse
import os
from datetime import datetime
import schedule
import requests
import uvicorn
import subprocess
import json
from dotenv import load_dotenv
import mimetypes
import tempfile
from PIL import Image
from io import BytesIO
from pathlib import Path
from fastapi.staticfiles import StaticFiles
from starlette.concurrency import run_in_threadpool
from starlette.responses import JSONResponse
import logging
import gc
import re
tmp_dir = tempfile.gettempdir()
BASE_URL = "https://chrunos-prem.hf.space"
load_dotenv()
app = FastAPI()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def env_to_cookies(env_content: str, output_file: str) -> None:
"""Convert environment variable content back to cookie file"""
try:
# Extract content from env format
if '="' not in env_content:
raise ValueError("Invalid env content format")
content = env_content.split('="', 1)[1].strip('"')
# Replace escaped newlines with actual newlines
cookie_content = content.replace('\\n', '\n')
# Write to cookie file
with open(output_file, 'w') as f:
f.write(cookie_content)
except Exception as e:
raise ValueError(f"Error converting to cookie file: {str(e)}")
def save_to_env_file(env_content: str, env_file: str = '.env') -> None:
"""Save environment variable content to .env file"""
try:
with open(env_file, 'w') as f:
f.write(env_content)
#print(f"Successfully saved to {env_file}")
except Exception as e:
raise ValueError(f"Error saving to env file: {str(e)}")
def env_to_cookies_from_env(output_file: str) -> None:
"""Convert environment variable from .env file to cookie file"""
try:
load_dotenv() # Load from .env file
env_content = os.getenv('FIREFOX_COOKIES')
#print(f"Printing env content: \n{env_content}")
if not env_content:
raise ValueError("FIREFOX_COOKIES not found in .env file")
env_to_cookies(f'FIREFOX_COOKIES="{env_content}"', output_file)
except Exception as e:
raise ValueError(f"Error converting to cookie file: {str(e)}")
def get_cookies():
"""Get cookies from environment variable"""
load_dotenv()
cookie_content = os.getenv('FIREFOX_COOKIES')
#print(cookie_content)
if not cookie_content:
raise ValueError("FIREFOX_COOKIES environment variable not set")
return cookie_content
def create_temp_cookie_file():
"""Create temporary cookie file from environment variable"""
temp_cookie = tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.txt')
try:
cookie_content = get_cookies()
# Replace escaped newlines with actual newlines
cookie_content = cookie_content.replace('\\n', '\n')
temp_cookie.write()
temp_cookie.flush()
return Path(temp_cookie.name)
finally:
temp_cookie.close()
@app.get('/')
def main():
return "Chrunos Downloader API Is Running."
@app.get("/get_video_url")
async def get_video_url(youtube_url: str):
try:
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
# Add cookies
ydl_opts = {
'format': 'best',
'quiet': True,
#'outtmpl': f'{VIDEO_DIR}/%(id)s.%(ext)s',
'max_filesize': 50 * 1024 * 1024
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(youtube_url, download=False)
return info
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Define a global temporary download directory
global_download_dir = tempfile.mkdtemp()
@app.post("/high")
async def download_high_video(request: Request):
data = await request.json()
video_url = data.get('url')
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None
quality = data.get('quality', '1080')
logger.info(f'input: {video_url} {quality}')
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
# 转换质量字符串为数值
height_map = {
'480': 480,
'720': 720,
'1080': 1080,
'1440': 1440,
'2160': 2160
}
max_height = height_map.get(quality, 1080)
# 为高分辨率视频(>1080p)不限制编码器,对较低分辨率优先选择H.264
if max_height > 1080:
# 支持横屏和竖屏视频 - 对于更高分辨率不限制编码器
format_str = f'bestvideo[height<={max_height}]/bestvideo[width<={max_height}]+bestaudio/best'
else:
# 优先选择H.264编码,同时支持横屏和竖屏(长边不超过指定分辨率的1.8倍)
long_edge = int(max_height * 1.8) # 假设长边与短边比例约为16:9
format_str = f'bestvideo[vcodec^=avc][height<={long_edge}][width<={long_edge}]+bestaudio/best'
ydl_opts = {
'format': format_str,
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
if is_youtube_url:
ydl_opts["cookiefile"] = "firefox-cookies.txt"
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url]))
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url}
''' try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(video_url, download=False)
available_formats = info.get('formats', [])
# 过滤出视频格式(排除音频和故事板)
video_formats = [f for f in available_formats if 'height' in f and f['height'] is not None]
# 根据高度排序
video_formats.sort(key=lambda x: x['height'], reverse=True)
selected_format = None
for fmt in video_formats:
if fmt['height'] <= max_height:
selected_format = fmt['format_id']
break
if not selected_format:
# 如果没有合适的格式,选择最大的格式
if video_formats:
selected_format = video_formats[0]['format_id']
else:
return {"error": "No suitable video format available"}
ydl_opts['format'] = f'{selected_format}+bestaudio/best'
await run_in_threadpool(lambda: ydl.download([video_url]))
except yt_dlp.DownloadError as e:
return {"error": f"Download failed: {str(e)}"}
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url}
'''
@app.post("/max")
async def download_high_quality_video(request: Request):
data = await request.json()
video_url = data.get('url')
is_youtube_url = re.search(r'(youtube\.com|youtu\.be)', video_url) is not None
# Get quality from request, default to 1080p if not specified
quality = data.get('quality', '1080') # Can be '480', '720', '1080', '1440', '2160' (4K)
logger.info(f'input: {video_url} {quality}')
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
# Convert quality string to height
height_map = {
'480': 480,
'720': 720,
'1080': 1080,
'1440': 1440,
'2160': 2160
}
max_height = height_map.get(quality, 1080)
if max_height > 1080:
format_str = f'bestvideo[height<={max_height}]+bestaudio/best'
else:
format_str = f'bestvideo[height<={max_height}][vcodec^=h264]+bestaudio/best'
ydl_opts = {
'format': format_str,
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'merge_output_format': 'mp4'
}
if is_youtube_url:
ydl_opts["cookiefile"] = "firefox-cookies.txt"
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url]))
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp4"))
if not downloaded_files:
return {"error": "Download failed"}
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
gc.collect()
return {"url": download_url}
@app.post("/audio")
async def download_audio(request: Request):
data = await request.json()
video_url = data.get('url')
logging.info(f"Received URL: {video_url}")
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'cookiefile': cookiefile,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}]
}
try:
logging.info("Starting yt-dlp download...")
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([video_url]))
logging.info("yt-dlp download completed")
except yt_dlp.utils.DownloadError as e:
error_message = str(e)
logging.error(f"yt-dlp error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
except Exception as e:
error_message = str(e)
logging.error(f"General error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3"))
if not downloaded_files:
logging.error("Download failed: No MP3 files found")
return JSONResponse(content={"error": "Download failed"}, status_code=500)
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
logging.info(f"Download URL: {download_url}")
# Log just before returning the response
logging.info("Preparing to send response back to the client")
gc.collect()
return JSONResponse(content={"url": download_url}, status_code=200)
@app.post("/search")
async def search_and_download_song(request: Request):
data = await request.json()
song_name = data.get('songname')
artist_name = data.get('artist')
if artist_name:
search_query = f"ytsearch:{song_name} {artist_name}"
else:
search_query = f"ytsearch:{song_name}"
logging.info(f"Search query: {search_query}")
cookiefile = "firefox-cookies.txt"
env_to_cookies_from_env("firefox-cookies.txt")
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
output_template = str(Path(global_download_dir) / f'%(title)s_{timestamp}.%(ext)s')
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': output_template,
'quiet': True,
'no_warnings': True,
'noprogress': True,
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192'
}],
'cookiefile': cookiefile
}
try:
logging.info("Starting yt-dlp search and download...")
await run_in_threadpool(lambda: yt_dlp.YoutubeDL(ydl_opts).download([search_query]))
logging.info("yt-dlp search and download completed")
except yt_dlp.utils.DownloadError as e:
error_message = str(e)
logging.error(f"yt-dlp error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
except Exception as e:
error_message = str(e)
logging.error(f"General error: {error_message}")
return JSONResponse(content={"error": error_message}, status_code=500)
downloaded_files = list(Path(global_download_dir).glob(f"*_{timestamp}.mp3"))
if not downloaded_files:
logging.error("Download failed: No MP3 files found")
return JSONResponse(content={"error": "Download failed"}, status_code=500)
downloaded_file = downloaded_files[0]
encoded_filename = urllib.parse.quote(downloaded_file.name)
download_url = f"{BASE_URL}/file/{encoded_filename}"
logging.info(f"Download URL: {download_url}")
# Log just before returning the response
logging.info("Preparing to send response back to the client")
gc.collect()
return JSONResponse(content={"url": download_url}, status_code=200)
# Mount the static files directory
app.mount("/file", StaticFiles(directory=global_download_dir), name="downloads")