leonelhs's picture
init space
02c4dcb
import sys
import importlib
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
from types import ModuleType
from typing import Any, List
from tqdm import tqdm
import facefusion.globals
from facefusion.typing import Process_Frames
from facefusion import wording
from facefusion.utilities import encode_execution_providers
FRAME_PROCESSORS_MODULES : List[ModuleType] = []
FRAME_PROCESSORS_METHODS =\
[
'get_frame_processor',
'clear_frame_processor',
'get_options',
'set_options',
'register_args',
'apply_args',
'pre_check',
'pre_process',
'process_frame',
'process_frames',
'process_image',
'process_video',
'post_process'
]
def load_frame_processor_module(frame_processor : str) -> Any:
try:
frame_processor_module = importlib.import_module('facefusion.processors.frame.modules.' + frame_processor)
for method_name in FRAME_PROCESSORS_METHODS:
if not hasattr(frame_processor_module, method_name):
raise NotImplementedError
except ModuleNotFoundError:
sys.exit(wording.get('frame_processor_not_loaded').format(frame_processor = frame_processor))
except NotImplementedError:
sys.exit(wording.get('frame_processor_not_implemented').format(frame_processor = frame_processor))
return frame_processor_module
def get_frame_processors_modules(frame_processors : List[str]) -> List[ModuleType]:
global FRAME_PROCESSORS_MODULES
if not FRAME_PROCESSORS_MODULES:
for frame_processor in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
return FRAME_PROCESSORS_MODULES
def clear_frame_processors_modules() -> None:
global FRAME_PROCESSORS_MODULES
for frame_processor_module in get_frame_processors_modules(facefusion.globals.frame_processors):
frame_processor_module.clear_frame_processor()
FRAME_PROCESSORS_MODULES = []
def multi_process_frames(source_path : str, temp_frame_paths : List[str], process_frames : Process_Frames) -> None:
with tqdm(total = len(temp_frame_paths), desc = wording.get('processing'), unit = 'frame', ascii = ' =') as progress:
progress.set_postfix(
{
'execution_providers': encode_execution_providers(facefusion.globals.execution_providers),
'execution_thread_count': facefusion.globals.execution_thread_count,
'execution_queue_count': facefusion.globals.execution_queue_count
})
with ThreadPoolExecutor(max_workers = facefusion.globals.execution_thread_count) as executor:
futures = []
queue_temp_frame_paths : Queue[str] = create_queue(temp_frame_paths)
queue_per_future = max(len(temp_frame_paths) // facefusion.globals.execution_thread_count * facefusion.globals.execution_queue_count, 1)
while not queue_temp_frame_paths.empty():
payload_temp_frame_paths = pick_queue(queue_temp_frame_paths, queue_per_future)
future = executor.submit(process_frames, source_path, payload_temp_frame_paths, progress.update)
futures.append(future)
for future_done in as_completed(futures):
future_done.result()
def create_queue(temp_frame_paths : List[str]) -> Queue[str]:
queue : Queue[str] = Queue()
for frame_path in temp_frame_paths:
queue.put(frame_path)
return queue
def pick_queue(queue : Queue[str], queue_per_future : int) -> List[str]:
queues = []
for _ in range(queue_per_future):
if not queue.empty():
queues.append(queue.get())
return queues