Spaces:
Running
on
Zero
Running
on
Zero
| # Copyright (C) 2024-present Naver Corporation. All rights reserved. | |
| # Licensed under CC BY-NC-SA 4.0 (non-commercial use only). | |
| # | |
| # -------------------------------------------------------- | |
| # modified from DUSt3R | |
| from tqdm import tqdm | |
| from multiprocessing.dummy import Pool as ThreadPool | |
| from multiprocessing import cpu_count | |
| def parallel_threads( | |
| function, | |
| args, | |
| workers=0, | |
| star_args=False, | |
| kw_args=False, | |
| front_num=1, | |
| Pool=ThreadPool, | |
| **tqdm_kw | |
| ): | |
| """tqdm but with parallel execution. | |
| Will essentially return | |
| res = [ function(arg) # default | |
| function(*arg) # if star_args is True | |
| function(**arg) # if kw_args is True | |
| for arg in args] | |
| Note: | |
| the <front_num> first elements of args will not be parallelized. | |
| This can be useful for debugging. | |
| """ | |
| while workers <= 0: | |
| workers += cpu_count() | |
| if workers == 1: | |
| front_num = float("inf") | |
| try: | |
| n_args_parallel = len(args) - front_num | |
| except TypeError: | |
| n_args_parallel = None | |
| args = iter(args) | |
| front = [] | |
| while len(front) < front_num: | |
| try: | |
| a = next(args) | |
| except StopIteration: | |
| return front # end of the iterable | |
| front.append( | |
| function(*a) if star_args else function(**a) if kw_args else function(a) | |
| ) | |
| out = [] | |
| with Pool(workers) as pool: | |
| if star_args: | |
| futures = pool.imap(starcall, [(function, a) for a in args]) | |
| elif kw_args: | |
| futures = pool.imap(starstarcall, [(function, a) for a in args]) | |
| else: | |
| futures = pool.imap(function, args) | |
| for f in tqdm(futures, total=n_args_parallel, **tqdm_kw): | |
| out.append(f) | |
| return front + out | |
| def parallel_processes(*args, **kwargs): | |
| """Same as parallel_threads, with processes""" | |
| import multiprocessing as mp | |
| kwargs["Pool"] = mp.Pool | |
| return parallel_threads(*args, **kwargs) | |
| def starcall(args): | |
| """convenient wrapper for Process.Pool""" | |
| function, args = args | |
| return function(*args) | |
| def starstarcall(args): | |
| """convenient wrapper for Process.Pool""" | |
| function, args = args | |
| return function(**args) | |