import logging import requests from urllib.parse import urlparse, quote_plus import subprocess from pathlib import Path from moviepy.editor import VideoFileClip import gradio as gr import http.server import socketserver import threading import socket import boto3 import s3fs import os # Define output directory output_dir = Path.cwd() / "output" output_dir.mkdir(exist_ok=True) # Set S3 credentials AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET") AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") session = boto3.Session( aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name='us-east-1' ) s3 = session.client('s3') fs = s3fs.S3FileSystem(anon=False, client_kwargs={'region_name': 'us-east-1'}) logging.basicConfig(level=logging.INFO) standard_resolutions = [4320, 2160, 1440, 1080, 720, 480, 360, 240] def start_http_server(port=8000): handler = http.server.SimpleHTTPRequestHandler httpd = socketserver.TCPServer(("", port), handler) logging.info(f"Serving at port {port}") thread = threading.Thread(target=httpd.serve_forever) thread.start() def get_ip_address(): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: s.connect(('10.255.255.255', 1)) IP = s.getsockname()[0] except Exception: IP = '127.0.0.1' finally: s.close() return IP def download_file(url, destination): response = requests.get(url) response.raise_for_status() with open(destination, 'wb') as f: f.write(response.content) def get_input_path(video_file, video_url): if video_file is not None: return Path(video_file.name) elif video_url: url_path = urlparse(video_url).path file_name = Path(url_path).name destination = output_dir / file_name download_file(video_url, destination) return destination else: raise ValueError("No input was provided.") def get_output_path(input_path, res): output_path = output_dir / f"{Path(input_path).stem}_{res}p.m3u8" return output_path def create_master_playlist(output_paths, input_filename): master_playlist_path = output_dir / f"{input_filename}_master_playlist.m3u8" with open(master_playlist_path, "w") as f: f.write("#EXTM3U\n") for path in output_paths: res = Path(path).stem.split("_")[1] f.write(f'#EXT-X-STREAM-INF:BANDWIDTH=8000000,RESOLUTION={res}\n') # read the .m3u8 file and replace relative links with absolute links with open(path, 'r') as playlist_file: content = playlist_file.readlines() content = [line.replace('.ts', f'.ts\n') if line.endswith('.ts\n') else line for line in content] content = [f"{quote_plus(line.rstrip())}\n" if line.endswith('.ts\n') else line for line in content] f.writelines(content) return master_playlist_path def upload_file_to_s3(file_path, s3_path): print(f"Uploading {file_path} to AWS at {s3_path}") with open(file_path, 'rb') as file: file_content = file.read() with fs.open(s3_path, 'wb') as file: file.write(file_content) def convert_video(video_file, quality, aspect_ratio, video_url): # Ensure either a file or a URL is provided if not video_file and not video_url: raise ValueError("Please provide either a video file or a video URL.") input_path = get_input_path(video_file, video_url) video = VideoFileClip(str(input_path)) # Get the original height to avoid upscaling original_height = video.size[1] # Get the original height to avoid upscaling original_height = video.size[1] output_paths = [] for res in reversed(standard_resolutions): if res > original_height: continue scale = "-1:" + str(res) output_path = get_output_path(input_path, str(res) + 'p') # use a lower CRF value for better quality ffmpeg_command = [ "ffmpeg", "-i", str(input_path), "-c:v", "libx264", "-crf", str(quality), "-vf", f"scale={scale}:force_original_aspect_ratio=decrease,pad=ceil(iw/2)*2:ceil(ih/2)*2", "-hls_time", "10", "-hls_playlist_type", "vod", "-hls_segment_filename", str(output_dir / f"{output_path.stem}_%03d.ts"), str(output_path) ] logging.info("Running ffmpeg command: " + ' '.join(ffmpeg_command)) subprocess.run(ffmpeg_command, check=True) output_paths.append(output_path) master_playlist_path = create_master_playlist(output_paths, Path(input_path).stem) output_paths.append(master_playlist_path) output_links = [] for path in output_paths: s3_path = f"s3://{AWS_S3_BUCKET}/{Path(input_path).stem}/{Path(path).name}" upload_file_to_s3(path, s3_path) output_links.append(f'Download {Path(path).stem}') # upload ts files to s3 for path in output_dir.glob(f"{Path(input_path).stem}_*p_*.ts"): s3_path = f"s3://{AWS_S3_BUCKET}/{Path(input_path).stem}/{Path(path).name}" upload_file_to_s3(path, s3_path) output_html = "
".join(output_links) return output_html video_file = gr.inputs.File(label="Your video file") quality = gr.inputs.Slider(minimum=0, maximum=51, default=23, label="Quality (0: Highest Quality - 50: Fastest)") aspect_ratio = gr.inputs.Dropdown(["16:9", "4:3", "1:1", "3:4", "9:16", "1:1", "2:1", "1:2"], label="Aspect Ratio", default="16:9") video_url = gr.inputs.Textbox(lines=1, placeholder="or paste video url here", label="Video URL") interface = gr.Interface( fn=convert_video, inputs=[video_file, quality, aspect_ratio, video_url], outputs=gr.outputs.HTML(label="Download Links"), title="NEAR Hub Video Transcoder to m3u8", description="convert video files to m3u8 for VOD streaming in nearhub.club.", allow_flagging='never' ) start_http_server() interface.launch(server_name=get_ip_address(), server_port=7860)