| import asyncio |
| import json |
| import time |
|
|
| import ray |
| from datasets import load_dataset |
|
|
| from lagent.distributed.ray_serve import AsyncAgentRayActor |
| from lagent.llms import INTERNLM2_META |
| from lagent.llms.lmdeploy_wrapper import AsyncLMDeployPipeline |
|
|
| ray.init() |
|
|
| |
|
|
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| model = dict( |
| type=AsyncLMDeployPipeline, |
| path='internlm/internlm2_5-7b-chat', |
| meta_template=INTERNLM2_META, |
| tp=1, |
| top_k=1, |
| temperature=1.0, |
| stop_words=['<|im_end|>', '<|action_end|>'], |
| max_new_tokens=1024, |
| ) |
|
|
| |
| print('-' * 80, 'interpreter', '-' * 80) |
| ds = load_dataset('lighteval/MATH', split='test') |
| problems = [item['problem'] for item in ds.select(range(5000))] |
|
|
| coder = dict( |
| type='lagent.agents.stream.AsyncMathCoder', |
| llm=model, |
| interpreter=dict(type='AsyncIPythonInterpreter', max_kernels=300), |
| ) |
| tic = time.time() |
|
|
| actor1 = AsyncAgentRayActor(coder.copy(), num_gpus=1) |
| actor2 = AsyncAgentRayActor(coder.copy(), num_gpus=1) |
| corots = [ |
| actor1(query, session_id=i) |
| for i, query in enumerate(problems[:len(problems) // 2]) |
| ] |
| corots += [ |
| actor2(query, session_id=i) |
| for i, query in enumerate(problems[len(problems) // 2:]) |
| ] |
| results = loop.run_until_complete(asyncio.gather(*corots)) |
|
|
| print('-' * 120) |
| print(f'time elapsed: {time.time() - tic}') |
| all_step = ray.get([ |
| actor1.agent_actor.get_steps.remote(i) for i in range(len(problems) // 2) |
| ]) |
| all_step += ray.get([ |
| actor2.agent_actor.get_steps.remote(i) |
| for i in range(len(problems[len(problems) // 2:])) |
| ]) |
|
|
| with open('./tmp_1.json', 'w') as f: |
| json.dump(all_step, f, ensure_ascii=False, indent=4) |
|
|