Jeffgold's picture
Update app.py
30f78b0
raw
history blame
7.45 kB
import logging
import shutil
import tempfile
import subprocess
from pathlib import Path
from moviepy.editor import VideoFileClip
import gradio as gr
import requests
from urllib.parse import urlparse
logging.basicConfig(level=logging.INFO)
def download_file(url, destination):
"""Downloads a file from a url to a destination."""
response = requests.get(url)
response.raise_for_status()
with open(destination, 'wb') as f:
f.write(response.content)
def get_input_path(video_file, video_url, temp_dir):
"""Returns the path to the video file, downloading it if necessary."""
if video_file is not None:
return Path(video_file.name)
elif video_url:
url_path = urlparse(video_url).path
file_name = Path(url_path).name
destination = temp_dir / file_name
download_file(video_url, destination)
return destination
else:
raise ValueError("No input was provided.")
def get_output_path(input_path, temp_dir, res):
"""Returns the path to the output file, creating it if necessary."""
output_path = temp_dir / (Path(input_path).stem + f"_{res}.m3u8")
return output_path
def process_output(output):
"""Process output and display it appropriately."""
if isinstance(output, str):
# If output is a string, assume it's an error message and display it as text.
return gr.outputs.Textbox()(output)
elif isinstance(output, Path):
# If output is a Path, assume it's an error file and provide it as a downloadable file.
return gr.outputs.File()(str(output))
elif isinstance(output, list):
# If output is a list, assume it's a list of Paths or URLs and display it as a markdown list.
return gr.outputs.Markdown()("\n".join(f"- {o}" for o in output))
else:
raise TypeError("Unexpected output type")
gr.Interface(
convert_video,
inputs=[video_file, quality, aspect_ratio, video_url, api_key, upload],
outputs=gr.outputs.Any(),
output_processor=process_output,
allow_flagging=False,
live=False,
).launch()
def get_aspect_ratio(input_path, aspect_ratio):
"""Returns the aspect ratio of the video, calculating it if necessary."""
if aspect_ratio is not None:
return aspect_ratio
video = VideoFileClip(str(input_path))
return f"{video.size[0]}:{video.size[1]}"
def upload_to_web3_storage(api_key, path):
"""Uploads a file to web3.storage using the HTTP API."""
headers = {"Authorization": f"Bearer {api_key}"}
with open(path, 'rb') as f:
response = requests.post(
"https://api.web3.storage/upload",
headers=headers,
data=f,
)
response.raise_for_status() # Raises an exception if the request failed
try:
cid = response.json()["value"]["cid"]
return f"https://dweb.link/ipfs/{cid}"
except KeyError:
logging.exception("An error occurred while uploading to web3.storage.")
return f"An error occurred while uploading to web3.storage: {response.json()}"
def create_master_playlist(output_paths, temp_dir):
"""Creates a master playlist .m3u8 file that includes all other .m3u8 files."""
master_playlist_path = temp_dir / "master_playlist.m3u8"
with open(master_playlist_path, 'w') as f:
f.write("#EXTM3U\n")
for path in output_paths:
f.write(f"#EXT-X-STREAM-INF:BANDWIDTH={1000*1000},RESOLUTION={path.stem.split('_')[-1]}\n")
f.write(f"{path.name}\n")
return master_playlist_path
def convert_video(video_file, quality, aspect_ratio, video_url, api_key, upload):
standard_resolutions = [4320, 2160, 1440, 1080, 720, 480] # 8K, 4K, 2K, Full HD, HD, SD in pixels
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = Path(temp_dir)
input_path = get_input_path(video_file, video_url, temp_dir)
aspect_ratio = get_aspect_ratio(input_path, aspect_ratio)
video = VideoFileClip(str(input_path))
original_height = video.size[1]
output_paths = [] # Define output_paths as an empty list
for res in standard_resolutions:
# Skip if resolution is higher than original
if res > original_height:
continue
scale = "-1:" + str(res) # we scale the height to res and keep aspect ratio
output_path = get_output_path(input_path, temp_dir, str(res) + 'p') # pass the resolution to create a unique output file
ffmpeg_command = [
"ffmpeg", "-i", str(input_path), "-c:v", "libx264", "-crf", str(quality),
"-vf", f"scale={scale},setsar={aspect_ratio}", "-hls_time", "6",
"-hls_playlist_type", "vod", "-f", "hls", str(output_path)
]
try:
logging.info("Running command: %s", " ".join(ffmpeg_command))
subprocess.run(ffmpeg_command, check=True, timeout=600, stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
logging.exception("ffmpeg command failed.")
error_file_path = temp_dir / "error.txt"
with open(error_file_path, 'w') as error_file:
error_file.write("ffmpeg command failed:\n")
error_file.write(e.stderr.decode())
return error_file_path
except subprocess.TimeoutExpired:
logging.exception("ffmpeg command timed out.")
return "ffmpeg command timed out."
except FileNotFoundError:
logging.exception("ffmpeg is not installed.")
return "ffmpeg is not installed."
except Exception as e:
logging.exception("An error occurred.")
return f"An error occurred: {str(e)}"
output_paths.append(output_path) # Append the output_path to output_paths
if not output_paths:
return "The video is smaller than the smallest standard resolution."
# Create master playlist
master_playlist_path = create_master_playlist(output_paths, temp_dir)
output_copy_paths = [shutil.copy2(path, tempfile.gettempdir()) for path in output_paths]
master_playlist_copy_path = shutil.copy2(master_playlist_path, tempfile.gettempdir())
if upload:
return [upload_to_web3_storage(api_key, path) for path in [master_playlist_copy_path] + output_copy_paths]
else:
return [master_playlist_copy_path] + output_copy_paths
def main():
video_file = gr.inputs.File(label="Video File")
quality = gr.inputs.Dropdown(
choices=["18", "23", "27", "28", "32"], label="Quality", default="27")
aspect_ratio = gr.inputs.Dropdown(
choices=["16:9", "1:1", "4:3", "3:2", "5:4", "21:9",
"1.85:1", "2.35:1", "3:1", "360", "9:16", "16:9",
"2:1", "1:2", "9:1"],
label="Aspect Ratio", default="16:9")
video_url = gr.inputs.Textbox(label="Video URL")
api_key = gr.inputs.Textbox(label="web3.storage API Key")
upload = gr.inputs.Checkbox(label="Upload to web3.storage", default=False)
gr.Interface(
convert_video,
inputs=[video_file, quality, aspect_ratio, video_url, api_key, upload],
outputs=gr.outputs.File(label="Download File"),
allow_flagging=False,
live=False,
).launch()
if __name__ == "__main__":
main()