radio-embed / ray_pad.py
freemt
Update ray_pad.txt ff-en.txt loc
33245ac
"""Run ray tests.
poetry add protobuf="^3.20.1"
"""
import multiprocessing
import os
from multiprocessing import Pool
from pathlib import Path
# import joblib
import more_itertools as mit
import numpy as np
import ray
from about_time import about_time
from logzero import logger
from radio_embed import radio_embed
num_cpus = multiprocessing.cpu_count()
filename = "fangfang-en.txt"
lines = Path(filename).read_text("utf8").splitlines()
lst = [_.strip() for _ in lines if _.strip()]
# with about_time() as dur: res1 = radio_embed("\n".join(lst))
# 143.72 s
# ray.init(num_cpus=num_cpus)
def test_pool(func, args_):
"""Test."""
with Pool(num_cpus) as pool:
ret = pool.map(func, args_)
# pool.close()
# pool.join()
return ret
args = "\n".join(lst)
args = ["\n".join(elm) for elm in mit.divide(num_cpus, lst)]
# with about_time() as dur2: res2 = test_pool(radio_embed, args)
# print(dur2.duration)
# 26.5s about 1/6 of
# res2a = np.concatenate(res2)
# np.allclose(res1, res2a, rtol=1e-05, atol=1e-07)
# %timeit ret = joblib.Parallel(n_jobs=num_cpus, backend='loky', verbose=0)(joblib.delayed(radio_embed)(arg) for arg in args)
# 28.1 s ± 1.08 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
# with about_time() as dur4: ret4 = joblib.Parallel(n_jobs=num_cpus, backend='multiprocessing', verbose=0)(joblib.delayed(radio_embed)(arg) for arg in args)
# dur4.duration 28.48s
# ret4a = np.concatenate(ret4)
# assert np.allclose(res1, ret4a, rtol=1e-05, atol=1e-07)
os.environ["TOKENIZERS_PARALLELISM"] = "false"
if not ray.is_initialized():
ray.init(num_cpus=num_cpus)
@ray.remote
def ray_embed(text):
"""Embed text to d-512."""
return radio_embed(text)
def main():
"""Run."""
_ = """
with about_time() as dur:
res1 = radio_embed("\n".join(lst))
print(dur.duration_human)
# 143.72 s 137 s
# """
with about_time() as dur5:
_ = [ray_embed.remote(arg) for arg in args]
res5 = ray.get(_)
print(num_cpus, dur5.duration_human) # 40s
res5a = np.concatenate(res5)
# _ = np.allclose(res1, res5a, rtol=1e-05, atol=1e-07)
# print(_)
ray.shutdown()
ray.init(num_cpus=num_cpus // 2)
with about_time() as dur5a:
_ = [ray_embed.remote(arg) for arg in args]
res6 = ray.get(_)
print(num_cpus // 2, dur5a.duration_human) # 40s
res6a = np.concatenate(res6)
_ = np.allclose(res5a, res6a, rtol=1e-05, atol=1e-07)
logger.info(" res5a allclose to res6a? %s", _)
ray.shutdown()
ray.init(num_cpus=2)
with about_time() as dur7:
_ = [ray_embed.remote(arg) for arg in args]
res7 = ray.get(_)
print(2, dur7.duration_human) # 90s
res7a = np.concatenate(res7)
_ = np.allclose(res5a, res7a, rtol=1e-05, atol=1e-07)
logger.info(" res5a allclose to res7a? %s", _)
# num_cpus - 1
ray.shutdown()
ray.init(num_cpus=num_cpus - 1)
with about_time() as dur8:
_ = [ray_embed.remote(arg) for arg in args]
res8 = ray.get(_)
print(num_cpus - 1, dur8.duration_human) # 44s
res8a = np.concatenate(res8)
_ = np.allclose(res5a, res8a, rtol=1e-05, atol=1e-07)
logger.info(" res5a allclose to res8a? %s", _)
print(num_cpus, dur5.duration_human) # 32s
print(num_cpus // 2, dur5a.duration_human) # 38s
print(2, dur7.duration_human) # 90s
print(num_cpus - 1, dur8.duration_human) # 44s
if __name__ == "__main__":
main()