chihaja / facefusion /process_manager.py
salominavina's picture
Upload 74 files
efdd9ee verified
raw
history blame contribute delete
No virus
990 Bytes
from typing import Generator, List
from facefusion.typing import QueuePayload, ProcessState
PROCESS_STATE : ProcessState = 'pending'
def get_process_state() -> ProcessState:
return PROCESS_STATE
def set_process_state(process_state : ProcessState) -> None:
global PROCESS_STATE
PROCESS_STATE = process_state
def is_checking() -> bool:
return get_process_state() == 'checking'
def is_processing() -> bool:
return get_process_state() == 'processing'
def is_stopping() -> bool:
return get_process_state() == 'stopping'
def is_pending() -> bool:
return get_process_state() == 'pending'
def check() -> None:
set_process_state('checking')
def start() -> None:
set_process_state('processing')
def stop() -> None:
set_process_state('stopping')
def end() -> None:
set_process_state('pending')
def manage(queue_payloads : List[QueuePayload]) -> Generator[QueuePayload, None, None]:
for query_payload in queue_payloads:
if is_processing():
yield query_payload