spotify_service / video_processor.py
Pranjal Pruthi
Add application files and dependencies
c79705c
import subprocess
import os
import glob
import re
import select
import shutil
import threading
import psutil
import signal
# Dictionary to store the status of each video processing task
status_dict = {}
process_dict = {}
def run_command_with_progress(command, session_id, tool_name, is_trimming=False):
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True)
process_dict[session_id] = process
status_dict[session_id]["status"] = f"Using {tool_name}..."
output_lines = []
error_lines = []
duration = None
while True:
if status_dict[session_id].get("status") == "Cancelled":
process.terminate()
process.wait()
return -1
reads = [process.stdout.fileno(), process.stderr.fileno()]
ret = select.select(reads, [], [], 0.1)
for fd in ret[0]:
if fd == process.stdout.fileno():
read = process.stdout.readline()
if read:
output_lines.append(read.strip())
status_dict[session_id]["status"] = f"{tool_name}: {read.strip()}"
if '[download]' in read:
match = re.search(r'(\d+\.\d+)%', read)
if match:
progress = min(float(match.group(1)), 100) / 100
status_dict[session_id]["status"] = f"{tool_name}: {progress*100:.2f}% complete"
if fd == process.stderr.fileno():
read = process.stderr.readline()
if read:
error_lines.append(read.strip())
# Parse ffmpeg output
if tool_name == "ffmpeg":
duration_match = re.search(r'Duration: (\d{2}):(\d{2}):(\d{2}\.\d{2})', read)
if duration_match:
hours, minutes, seconds = map(float, duration_match.groups())
duration = hours * 3600 + minutes * 60 + seconds
time_match = re.search(r'time=(\d{2}):(\d{2}):(\d{2}\.\d{2})', read)
if time_match and duration:
hours, minutes, seconds = map(float, time_match.groups())
current_time = hours * 3600 + minutes * 60 + seconds
progress = min((current_time / duration), 1)
status_dict[session_id]["status"] = f"{tool_name}: {progress*100:.2f}% complete"
if process.poll() is not None:
break
return_code = process.poll()
if return_code != 0:
status_dict[session_id]["status"] = f"Error occurred during {tool_name} processing."
status_dict[session_id]["error_details"] = "\n".join(error_lines) # Store detailed error information
return return_code
def process_video(url, session_id):
output_dir = f"./output_folder_{session_id}"
os.makedirs(output_dir, exist_ok=True)
# Clear any existing files in the output directory
for file in glob.glob(f"{output_dir}/*"):
os.remove(file)
content_type = "playlist" if "list=" in url else "video"
format = "bestvideo[height<=1080][ext=mp4]+bestaudio[ext=m4a]/best[height<=1080][ext=mp4]"
download_command = f'yt-dlp --username oauth2 --password "" -f "{format}" -N 64 -o "{output_dir}/%(title)s.%(ext)s" "{url}"'
return_code = run_command_with_progress(download_command, session_id, "yt-dlp")
if return_code != 0:
status_dict[session_id]["status"] = "Error occurred during download."
return
if content_type == "video":
video_file = glob.glob(f"{output_dir}/*.mp4")[0]
trimmed_file = video_file.replace(".mp4", "_trimmed.mp4")
auto_editor_command = f'auto-editor "{video_file}" --margin 0.5sec --output "{trimmed_file}"'
return_code = run_command_with_progress(auto_editor_command, session_id, "auto-editor", is_trimming=True)
if return_code != 0:
status_dict[session_id]["status"] = "Error occurred during trimming."
return
final_file = trimmed_file.replace("_trimmed.mp4", f"_f_{session_id}.mp4")
ffmpeg_command = f'ffmpeg -y -i "{trimmed_file}" -map 0:v:0 -map 0:a:0 -c:v libx264 -preset ultrafast -crf 23 -maxrate 25M -vf "scale=-1:1080" -c:a aac -b:a 192k -ac 2 -strict -2 "{final_file}"'
return_code = run_command_with_progress(ffmpeg_command, session_id, "ffmpeg")
if return_code != 0:
status_dict[session_id]["status"] = "Error occurred during final processing."
return
status_dict[session_id]["status"] = f"Processing completed! File: {final_file}"
return final_file
elif content_type == "playlist":
trimmed_dir = f"{output_dir}/trimmed"
os.makedirs(trimmed_dir, exist_ok=True)
for video_file in glob.glob(f"{output_dir}/*.mp4"):
video_name = os.path.basename(video_file).replace(".mp4", "")
trimmed_file = f"{trimmed_dir}/{video_name}_trimmed.mp4"
auto_editor_command = f'auto-editor "{video_file}" --margin 0.5sec --output "{trimmed_file}" --verbose'
return_code = run_command_with_progress(auto_editor_command, session_id, "auto-editor", is_trimming=True)
if return_code != 0:
status_dict[session_id]["status"] = f"Error occurred during trimming of {video_name}."
continue
final_file = f"{trimmed_dir}/{video_name}_f.mp4"
ffmpeg_command = f'ffmpeg -y -i "{trimmed_file}" -map 0:v:0 -map 0:a:0 -c:v libx264 -preset ultrafast -crf 23 -maxrate 25M -vf "scale=-1:1080" -c:a aac -b:a 192k -ac 2 -strict -2 "{final_file}"'
return_code = run_command_with_progress(ffmpeg_command, session_id, "ffmpeg")
if return_code != 0:
status_dict[session_id]["status"] = f"Error occurred during final processing of {video_name}."
continue
status_dict[session_id]["status"] = f"Processing completed! Directory: {trimmed_dir}"
return trimmed_dir
def get_status(session_id):
return status_dict.get(session_id, {}).get("status", "Invalid session ID.")
def get_error_details(session_id):
return status_dict.get(session_id, {}).get("error_details", "No error details available.")
def cancel_job(session_id):
if session_id in status_dict:
status_dict[session_id]["status"] = "Cancelled"
if session_id in process_dict:
process = process_dict[session_id]
if process.poll() is None: # Check if the process is still running
parent = psutil.Process(process.pid)
for child in parent.children(recursive=True):
child.terminate()
parent.terminate()
process_dict.pop(session_id)
return True
return False
def cleanup_cancelled_jobs():
for session_id in list(status_dict.keys()):
if status_dict[session_id]["status"] == "Cancelled":
output_dir = f"./output_folder_{session_id}"
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
status_dict.pop(session_id)