import os import cv2 import time import glob import shutil import platform import datetime import subprocess from threading import Thread from moviepy.editor import VideoFileClip, ImageSequenceClip from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip def trim_video(video_path, output_path, start_frame, stop_frame): video_name, _ = os.path.splitext(os.path.basename(video_path)) trimmed_video_filename = video_name + "_trimmed" + ".mp4" temp_path = os.path.join(output_path, "trim") os.makedirs(temp_path, exist_ok=True) trimmed_video_file_path = os.path.join(temp_path, trimmed_video_filename) video = VideoFileClip(video_path) fps = video.fps start_time = start_frame / fps duration = (stop_frame - start_frame) / fps trimmed_video = video.subclip(start_time, start_time + duration) trimmed_video.write_videofile( trimmed_video_file_path, codec="libx264", audio_codec="aac" ) trimmed_video.close() video.close() return trimmed_video_file_path def open_directory(path=None): if path is None: return try: os.startfile(path) except: subprocess.Popen(["xdg-open", path]) class StreamerThread(object): def __init__(self, src=0): self.capture = cv2.VideoCapture(src) self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 2) self.FPS = 1 / 30 self.FPS_MS = int(self.FPS * 1000) self.thread = None self.stopped = False self.frame = None def start(self): self.thread = Thread(target=self.update, args=()) self.thread.daemon = True self.thread.start() def stop(self): self.stopped = True self.thread.join() print("stopped") def update(self): while not self.stopped: if self.capture.isOpened(): (self.status, self.frame) = self.capture.read() time.sleep(self.FPS) class ProcessBar: def __init__(self, bar_length, total, before="⬛", after="🟨"): self.bar_length = bar_length self.total = total self.before = before self.after = after self.bar = [self.before] * bar_length self.start_time = time.time() def get(self, index): total = self.total elapsed_time = time.time() - self.start_time average_time_per_iteration = elapsed_time / (index + 1) remaining_iterations = total - (index + 1) estimated_remaining_time = remaining_iterations * average_time_per_iteration self.bar[int(index / total * self.bar_length)] = self.after info_text = f"({index+1}/{total}) {''.join(self.bar)} " info_text += f"(ETR: {int(estimated_remaining_time // 60)} min {int(estimated_remaining_time % 60)} sec)" return info_text logo_image = cv2.imread("./assets/images/logo.png", cv2.IMREAD_UNCHANGED) def add_logo_to_image(img, logo=logo_image): logo_size = int(img.shape[1] * 0.1) logo = cv2.resize(logo, (logo_size, logo_size)) if logo.shape[2] == 4: alpha = logo[:, :, 3] else: alpha = np.ones_like(logo[:, :, 0]) * 255 padding = int(logo_size * 0.1) roi = img.shape[0] - logo_size - padding, img.shape[1] - logo_size - padding for c in range(0, 3): img[roi[0] : roi[0] + logo_size, roi[1] : roi[1] + logo_size, c] = ( alpha / 255.0 ) * logo[:, :, c] + (1 - alpha / 255.0) * img[ roi[0] : roi[0] + logo_size, roi[1] : roi[1] + logo_size, c ] return img