Spaces:
Paused
Paused
File size: 7,394 Bytes
c16a803 5296733 c16a803 40446f2 e4ed31e 6c154f8 c16a803 8c8c808 b1ae044 c16a803 40446f2 6c154f8 c16a803 6c154f8 79683ca 6c154f8 79683ca c047e75 e4ed31e cbd85fc 6c154f8 3c753ee 8c8c808 712c716 6c154f8 712c716 6c154f8 79683ca 6c154f8 6f1f7b0 30f78b0 f3b763e 49ea8e3 88ed798 c16a803 5a36eaa c16a803 6f1f7b0 88ed798 0212a08 b1ae044 e280197 88ed798 bd750f5 88ed798 5a36eaa b47b30e 5a36eaa 30f78b0 88ed798 88280cc d3e2a95 88280cc 88ed798 88280cc 88ed798 88280cc 88ed798 b1ae044 8c8c808 b1ae044 88ed798 e280197 30f78b0 b1ae044 30f78b0 c358ea4 30f78b0 6c154f8 88ed798 0c96b87 6c154f8 0054dff 95bc252 0054dff 88ed798 6c154f8 b1ae044 0054dff b7e494d 8c8c808 15d3f13 8c8c808 0054dff 0c96b87 95bc252 9412aac b7e494d 6c154f8 c358ea4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import logging
import shutil
import tempfile
import subprocess
from pathlib import Path
from moviepy.editor import VideoFileClip
import gradio as gr
import requests
from urllib.parse import urlparse
from ftplib import FTP
logging.basicConfig(level=logging.INFO)
def download_file(url, destination):
"""Downloads a file from a url to a destination."""
response = requests.get(url)
response.raise_for_status()
with open(destination, 'wb') as f:
f.write(response.content)
def get_input_path(video_file, video_url, temp_dir):
"""Returns the path to the video file, downloading it if necessary."""
if video_file is not None:
return Path(video_file.name)
elif video_url:
url_path = urlparse(video_url).path
file_name = Path(url_path).name
destination = temp_dir / file_name
download_file(video_url, destination)
return destination
else:
raise ValueError("No input was provided.")
def upload_to_ftp(server, username, password, source_file, destination_file):
"""Uploads a file to an FTP server."""
with FTP(server, username, password) as ftp:
with open(source_file, 'rb') as f:
ftp.storbinary(f'STOR {destination_file}', f)
def get_output_path(input_path, temp_dir, res):
"""Returns the path to the output file, creating it if necessary."""
output_path = temp_dir / (Path(input_path).stem + f"_{res}.m3u8")
return output_path
def get_aspect_ratio(input_path, aspect_ratio):
"""Returns the aspect ratio of the video, calculating it if necessary."""
if aspect_ratio is not None:
return aspect_ratio
video = VideoFileClip(str(input_path))
return f"{video.size[0]}:{video.size[1]}"
def create_master_playlist(output_paths, temp_dir):
"""Creates a master playlist .m3u8 file that includes all other .m3u8 files."""
master_playlist_path = temp_dir / "master_playlist.m3u8"
with open(master_playlist_path, 'w') as f:
f.write("#EXTM3U\n")
for path in output_paths:
f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={1000*1000},RESOLUTION={path.stem.split('_')[-1]}\n")
f.write(f"{path.name}\n")
return master_playlist_path
def convert_video(video_file, quality, aspect_ratio, video_url):
standard_resolutions = [4320, 2160, 1440, 1080, 720, 480, 360, 240, 144] # 8K, 4K, 2K, Full HD, HD, SD in pixels
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = Path(temp_dir)
input_path = get_input_path(video_file, video_url, temp_dir)
aspect_ratio = get_aspect_ratio(input_path, aspect_ratio)
video = VideoFileClip(str(input_path))
original_height = video.size[1]
output_paths = [] # Define output_paths as an empty list
for res in standard_resolutions:
# Skip if resolution is higher than original
if res > original_height:
continue
scale = "-1:" + str(res) # we scale the height to res and keep aspect ratio
output_path = get_output_path(input_path, temp_dir, str(res) + 'p') # pass the resolution to create a unique output file
ffmpeg_command = [
"ffmpeg", "-i", str(input_path), "-c:v", "libx264", "-crf", str(quality),
"-vf", f"scale={scale}:force_original_aspect_ratio=decrease,pad=ceil(iw/2)*2:ceil(ih/2)*2,setsar={aspect_ratio}",
"-hls_time", "6",
"-hls_playlist_type", "vod", "-f", "hls", str(output_path)
]
try:
result = subprocess.run(ffmpeg_command, check=True, timeout=600, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
logging.error("ffmpeg command failed with the following error:\n%s", e.stderr)
error_file_path = tempfile.gettempdir() + "/error.txt"
with open(error_file_path, 'w') as error_file:
error_file.write("ffmpeg command failed with the following error:\n" + e.stderr)
return error_file_path
except subprocess.TimeoutExpired:
logging.exception("ffmpeg command timed out.")
return "ffmpeg command timed out."
except FileNotFoundError:
logging.exception("ffmpeg is not installed.")
return "ffmpeg is not installed."
except Exception as e:
logging.exception("An error occurred.")
return f"An error occurred: {str(e)}"
output_paths.append(output_path) # Append the output_path to output_paths
if upload:
ftp_files = []
for path in [master_playlist_copy_path] + output_copy_paths:
destination = ftp_path + "/" + path.name
upload_to_ftp(ftp_server, ftp_username, ftp_password, path, destination)
ftp_files.append(destination)
return ftp_files
else:
return [master_playlist_copy_path] + output_copy_paths
if not output_paths:
return "The video is smaller than the smallest standard resolution."
# Create master playlist
master_playlist_path = create_master_playlist(output_paths, temp_dir)
output_copy_paths = [shutil.copy2(path, tempfile.gettempdir()) for path in output_paths]
master_playlist_copy_path = shutil.copy2(master_playlist_path, tempfile.gettempdir())
return [master_playlist_copy_path] + output_copy_paths
def process_output(output):
"""Process output and display it appropriately."""
if isinstance(output, str):
# If output is a string, assume it's an error message and display it as text.
return gr.outputs.Textbox()(output)
elif isinstance(output, Path):
# If output is a Path, assume it's an error file and provide it as a downloadable file.
return gr.outputs.File()(str(output))
elif isinstance(output, list):
# If output is a list, assume it's a list of Paths or URLs and display it as a markdown list.
return gr.outputs.Markdown()("\n".join(f"- {o}" for o in output))
else:
raise TypeError("Unexpected output type")
def main():
video_file = gr.inputs.File(label="Video File")
quality = gr.inputs.Dropdown(
choices=["18", "23", "27", "28", "32"], label="Quality", default="27")
aspect_ratio = gr.inputs.Dropdown(
choices=["16:9", "1:1", "4:3", "3:2", "5:4", "21:9",
"1.85:1", "2.35:1", "3:1", "360", "9:16", "16:9",
"2:1", "1:2", "9:1"],
label="Aspect Ratio", default="16:9")
video_url = gr.inputs.Textbox(label="Video URL")
ftp_server = gr.inputs.Textbox(label="FTP Server")
ftp_username = gr.inputs.Textbox(label="FTP Username")
ftp_password = gr.inputs.Textbox(label="FTP Password", type="password")
ftp_path = gr.inputs.Textbox(label="FTP Path")
upload = gr.inputs.Checkbox(label="Upload to FTP server", default=False)
gr.Interface(
convert_video,
inputs=[video_file, quality, aspect_ratio, video_url, ftp_server, ftp_username, ftp_password, ftp_path, upload],
outputs=gr.outputs.Textbox(),
output_processor=process_output,
allow_flagging=False,
live=False,
).launch()
if __name__ == "__main__":
main()
|