import os import sys import importlib import psutil from concurrent.futures import ThreadPoolExecutor, as_completed from queue import Queue from types import ModuleType from typing import Any, List, Callable from tqdm import tqdm import DeepFakeAI.globals from DeepFakeAI import wording FRAME_PROCESSORS_MODULES : List[ModuleType] = [] FRAME_PROCESSORS_METHODS =\ [ 'get_frame_processor', 'clear_frame_processor', 'pre_check', 'pre_process', 'process_frame', 'process_frames', 'process_image', 'process_video', 'post_process' ] def load_frame_processor_module(frame_processor : str) -> Any: try: frame_processor_module = importlib.import_module('DeepFakeAI.processors.frame.modules.' + frame_processor) for method_name in FRAME_PROCESSORS_METHODS: if not hasattr(frame_processor_module, method_name): raise NotImplementedError except ModuleNotFoundError: sys.exit(wording.get('frame_processor_not_loaded').format(frame_processor = frame_processor)) except NotImplementedError: sys.exit(wording.get('frame_processor_not_implemented').format(frame_processor = frame_processor)) return frame_processor_module def get_frame_processors_modules(frame_processors : List[str]) -> List[ModuleType]: global FRAME_PROCESSORS_MODULES if not FRAME_PROCESSORS_MODULES: for frame_processor in frame_processors: frame_processor_module = load_frame_processor_module(frame_processor) FRAME_PROCESSORS_MODULES.append(frame_processor_module) return FRAME_PROCESSORS_MODULES def clear_frame_processors_modules() -> None: global FRAME_PROCESSORS_MODULES for frame_processor_module in get_frame_processors_modules(DeepFakeAI.globals.frame_processors): frame_processor_module.clear_frame_processor() FRAME_PROCESSORS_MODULES = [] def multi_process_frame(source_path : str, temp_frame_paths : List[str], process_frames: Callable[[str, List[str], Any], None], update: Callable[[], None]) -> None: with ThreadPoolExecutor(max_workers = DeepFakeAI.globals.execution_thread_count) as executor: futures = [] queue = create_queue(temp_frame_paths) queue_per_future = max(len(temp_frame_paths) // DeepFakeAI.globals.execution_thread_count * DeepFakeAI.globals.execution_queue_count, 1) while not queue.empty(): future = executor.submit(process_frames, source_path, pick_queue(queue, queue_per_future), update) futures.append(future) for future in as_completed(futures): future.result() def create_queue(temp_frame_paths : List[str]) -> Queue[str]: queue: Queue[str] = Queue() for frame_path in temp_frame_paths: queue.put(frame_path) return queue def pick_queue(queue : Queue[str], queue_per_future : int) -> List[str]: queues = [] for _ in range(queue_per_future): if not queue.empty(): queues.append(queue.get()) return queues def process_video(source_path : str, frame_paths : List[str], process_frames : Callable[[str, List[str], Any], None]) -> None: progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' total = len(frame_paths) with tqdm(total = total, desc = wording.get('processing'), unit = 'frame', dynamic_ncols = True, bar_format = progress_bar_format) as progress: multi_process_frame(source_path, frame_paths, process_frames, lambda: update_progress(progress)) def update_progress(progress : Any = None) -> None: process = psutil.Process(os.getpid()) memory_usage = process.memory_info().rss / 1024 / 1024 / 1024 progress.set_postfix( { 'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB', 'execution_providers': DeepFakeAI.globals.execution_providers, 'execution_thread_count': DeepFakeAI.globals.execution_thread_count, 'execution_queue_count': DeepFakeAI.globals.execution_queue_count }) progress.refresh() progress.update(1) def get_device() -> str: if 'CUDAExecutionProvider' in DeepFakeAI.globals.execution_providers: return 'cuda' if 'CoreMLExecutionProvider' in DeepFakeAI.globals.execution_providers: return 'mps' return 'cpu'