ai-editor / app /processor.py
gtfgffg's picture
Upload 9 files
cdaeb89 verified
# -*- coding: utf-8 -*-
import os, glob, json, re
import numpy as np
import librosa
from scenedetect import VideoManager, SceneManager
from scenedetect.detectors import ContentDetector
from moviepy.editor import (
VideoFileClip, concatenate_videoclips, TextClip, CompositeVideoClip,
AudioFileClip, ImageClip
)
def parse_duration_to_seconds(value):
if value is None:
return None
if isinstance(value, (int, float)):
return float(value)
s = str(value).strip().lower()
if s.endswith('s'):
try:
return float(s[:-1])
except:
pass
m = re.match(r'^(?:(\d+):)?(\d{1,2}):(\d{1,2})(?:\.(\d+))?$', s)
if m:
h = int(m.group(1) or 0)
mm = int(m.group(2) or 0)
ss = int(m.group(3) or 0)
frac = m.group(4)
total = h*3600 + mm*60 + ss
if frac:
total += float("0." + frac)
return float(total)
try:
return float(s)
except:
return None
def detect_beats(music_path):
y, sr = librosa.load(music_path, sr=None, mono=True)
tempo, beats = librosa.beat.beat_track(y=y, sr=sr, units='frames')
times = librosa.frames_to_time(beats, sr=sr)
return tempo, times.tolist()
def detect_scenes(video_path, threshold=27.0, min_scene_len=12):
vm = VideoManager([video_path])
sm = SceneManager()
sm.add_detector(ContentDetector(threshold=threshold, min_scene_len=min_scene_len))
vm.start()
sm.detect_scenes(frame_source=vm)
scene_list = sm.get_scene_list()
vm.release()
return [(s[0].get_seconds(), s[1].get_seconds()) for s in scene_list]
def choose_segments_from_scenes(scene_ranges, beats, min_slice=0.7):
segments = []
for i in range(len(beats)-1):
bw_start, bw_end = beats[i], beats[i+1]
picked = None
for (s,e) in scene_ranges:
start = max(s, bw_start)
end = min(e, bw_end)
if end - start >= min_slice:
picked = (start, end)
break
if picked is None and (bw_end - bw_start) >= min_slice:
picked = (bw_start, bw_end)
if picked:
segments.append(picked)
return segments
def resize_fit(clip, target_w, target_h):
r_target = target_w / target_h
w, h = clip.size
r_src = w / h
if r_src > r_target:
new_h = target_h
new_w = int(round(r_src * new_h))
c = clip.resize(height=new_h).crop(x_center=new_w//2, width=target_w, height=target_h)
else:
new_w = target_w
new_h = int(round(new_w / r_src))
c = clip.resize(width=new_w).crop(y_center=new_h//2, width=target_w, height=target_h)
return c
def build_video_single_aspect(segments, W, H, music_path, out_path, intro_text=None, logo_path=None, crossfade=0.0, fps=30):
clips=[]
for path,(s,e) in segments:
c = VideoFileClip(path).subclip(s,e)
c = resize_fit(c, W, H)
if crossfade>0:
c = c.crossfadein(crossfade) if clips else c
clips.append(c)
if not clips:
raise RuntimeError("No segments to compile.")
body = concatenate_videoclips(clips, method="compose", padding=-crossfade if crossfade>0 else 0)
overlays = []
if intro_text:
try:
txt = TextClip(intro_text, fontsize=90, font="Arial-Bold", color="white").set_duration(2).set_pos("center")
except Exception:
txt = TextClip(intro_text, fontsize=90, color="white").set_duration(2).set_pos("center")
overlays.append(txt.set_start(0))
if logo_path and os.path.exists(logo_path):
logo = ImageClip(logo_path).set_duration(body.duration).resize(width=int(W*0.18)).set_pos(("right","bottom")).margin(right=40, bottom=40, opacity=0)
overlays.append(logo)
final = CompositeVideoClip([body] + overlays, size=(W,H))
if os.path.exists(music_path):
a = AudioFileClip(music_path).subclip(0, final.duration)
final = final.set_audio(a)
final.write_videofile(out_path, codec="libx264", audio_codec="aac", fps=fps, threads=4)
def run_job(job_dir):
cfg_path = os.path.join(job_dir, "config.json")
with open(cfg_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
# Inputs
footage_dir = os.path.join(job_dir, "footage")
music_path = os.path.join(job_dir, "music")
if not os.path.exists(music_path):
# try known extensions
for ext in (".mp3",".wav",".m4a"):
p = music_path+ext
if os.path.exists(p):
music_path = p
break
logo_path = os.path.join(job_dir, "logo.png")
if not os.path.exists(footage_dir):
raise RuntimeError("footage/ missing")
videos = sorted(glob.glob(os.path.join(footage_dir, "*.mp4")) + glob.glob(os.path.join(footage_dir, "*.mov")))
if not videos:
raise RuntimeError("No videos found in footage/. Upload .mp4 or .mov files.")
if not os.path.exists(music_path):
raise RuntimeError("Music file missing.")
# Prompt-ish
duration = parse_duration_to_seconds(cfg.get("duration"))
intro_text = cfg.get("intro_text") or None
crossfade = float(cfg.get("crossfade", 0.0))
aspects = cfg.get("aspects", ["9:16"]) # list
threshold = float(cfg.get("scene_threshold", 27.0))
# analysis
tempo, beats = detect_beats(music_path)
scene_map = {}
for p in videos:
scene_map[p] = detect_scenes(p, threshold=threshold)
# build candidate segments
segs = []
for p in videos:
for (s,e) in choose_segments_from_scenes(scene_map[p], beats):
segs.append((p,(s,e)))
# trim to duration
if duration:
trimmed, acc = [], 0.0
for p,(s,e) in segs:
dur = e - s
if acc + dur > duration:
e = s + max(0.7, duration - acc)
dur = e - s
trimmed.append((p,(s,e)))
acc += dur
if acc >= duration:
break
segs = trimmed
# map aspect to (W,H)
def parse_aspect(a):
if a in ("9:16","1080x1920"): return (1080,1920)
if a in ("16:9","1920x1080"): return (1920,1080)
if "x" in a:
try:
w,h = [int(x) for x in a.split("x")]
return (w,h)
except: pass
return (1080,1920)
outputs = []
for a in aspects:
W,H = parse_aspect(a)
out_path = os.path.join(job_dir, f"output_{W}x{H}.mp4")
build_video_single_aspect(segs, W,H, music_path, out_path, intro_text=intro_text, logo_path=(logo_path if os.path.exists(logo_path) else None), crossfade=crossfade, fps=30)
outputs.append(out_path)
return outputs