from typing import Any, List, Callable import psutil import os from concurrent.futures import ThreadPoolExecutor, as_completed from queue import Queue from .image import ChainImgProcessor from tqdm import tqdm import cv2 def create_queue(temp_frame_paths: List[str]) -> Queue[str]: queue: Queue[str] = Queue() for frame_path in temp_frame_paths: queue.put(frame_path) return queue def pick_queue(queue: Queue[str], queue_per_future: int) -> List[str]: queues = [] for _ in range(queue_per_future): if not queue.empty(): queues.append(queue.get()) return queues class ChainBatchImageProcessor(ChainImgProcessor): chain = None func_params_gen = None num_threads = 1 def __init__(self): ChainImgProcessor.__init__(self) def init_with_plugins(self): self.init_plugins(["core"]) self.display_init_info() init_on_start_arr = self.init_on_start.split(",") for proc_id in init_on_start_arr: self.init_processor(proc_id) def update_progress(self, progress: Any = None) -> None: process = psutil.Process(os.getpid()) memory_usage = process.memory_info().rss / 1024 / 1024 / 1024 progress.set_postfix({ 'memory_usage': '{:.2f}'.format(memory_usage).zfill(5) + 'GB', 'execution_threads': self.num_threads }) progress.refresh() progress.update(1) def process_frames(self, source_files: List[str], target_files: List[str], current_files, update: Callable[[], None]) -> None: for f in current_files: temp_frame = cv2.imread(f) if temp_frame is not None: if self.func_params_gen: params = self.func_params_gen(None, temp_frame) else: params = {} resimg, _ = self.run_chain(temp_frame, params, self.chain) if resimg is not None: i = source_files.index(f) cv2.imwrite(target_files[i], resimg) if update: update() def run_batch_chain(self, source_files, target_files, threads:int = 1, chain = None, params_frame_gen_func = None): self.chain = chain self.func_params_gen = params_frame_gen_func progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' total = len(source_files) self.num_threads = threads with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: with ThreadPoolExecutor(max_workers=threads) as executor: futures = [] queue = create_queue(source_files) queue_per_future = max(len(source_files) // threads, 1) while not queue.empty(): future = executor.submit(self.process_frames, source_files, target_files, pick_queue(queue, queue_per_future), lambda: self.update_progress(progress)) futures.append(future) for future in as_completed(futures): future.result()