| The Design of ``verl.single_controller`` |
| ============================================== |
|
|
| Last updated: 05/21/2025. |
|
|
| **Author:**\ `Wang Zhang <https://github.com/zw0610>`__ |
|
|
| Preface |
| ------- |
|
|
| We prepared this document for developers of ``verl``, particularly those |
| interested in understanding or contributing to the |
| ``verl.single_controller`` module. It is not intended for end users, but |
| for contributors seeking to understand the architectural rationale and |
| internal mechanics. |
|
|
| -------------- |
| |
| Origin |
| ------ |
|
|
| The ``single_controller`` module originated from a request I received — |
| to adapt a toy single-process RLHF script into a distributed system with |
| minimal changes, while maintaining ease of debugging. |
|
|
| Common practice — such as using PyTorch’s Distributed Data Parallel |
| (DDP) — typically involves wrapping ``nn.Module`` and launching multiple |
| processes that execute the same function under different ranks. However, |
| this approach presents two main limitations in the context of |
| distributed RLHF: - Difficulty representing multiple DAGs as required by |
| PPO; - Difficulty inspecting intermediate tensors during training. |
|
|
| To maintain debuggability, we opted for a different approach — breaking |
| the training loop into well-defined stages like ``generate_sequences``, |
| ``compute_advantages``, and so on. |
|
|
| We selected `Ray <https://www.ray.io/>`__ as the initial backend for |
| ``verl`` due to its ability to expose Python class methods as RPC |
| endpoints. However, Ray’s default model only supports **one method call, |
| one RPC**, while training LLMs typically requires coordination across |
| multiple processes. |
|
|
| To hide this multi-Ray actors invocation for a single method from users, |
| we introduced the following components: |
|
|
| - ``WorkerGroup`` – manages a group of remote workers and provides |
| a unified interface for multi-process distributed computation; |
| - ``ResourcePool`` – binds computational resources to worker |
| processes; |
| - ``ClassWithArgs`` – enables delayed remote instantiation with |
| specified initialization arguments. |
|
|
| -------------- |
| |
| A Running Example: ``generate_sequences`` |
| ----------------------------------------- |
|
|
| To illustrate the design, we walk through how the ``generate_sequences`` |
| method in the ``ActorRolloutRefWorker`` class is registered and invoked |
| across distributed workers. |
|
|
| -------------- |
| |
| Step 1: Register with a Decorator |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| |
| The first step is to define the ``generate_sequences`` and decorate it |
| with ``@register`` as it will be called in driver script. |
| |
| **Source:** |
| `fsdp_workers.py <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/workers/fsdp_workers.py#L528>`__ |
| |
| .. code:: python |
| |
| class ActorRolloutRefWorker(Worker): |
| ... |
| @register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO) |
| def generate_sequences(self, prompts: DataProto): |
| prompts = prompts.to(torch.cuda.current_device()) |
| ... |
| |
| The ``@register`` decorator adds metadata to the ``generate_sequences`` |
| method. Currently, it doesn’t alter functionality, but attaches |
| attributes via a magic key (``MAGIC_ATTR``): |
| |
| **Source:** |
| `decorator.py <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/single_controller/base/decorator.py#L411>`__ |
| |
| .. code:: python |
| |
| def register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.ALL, blocking=True, materialize_futures=True): |
| ... |
| def decorator(func): |
| @wraps(func) |
| def inner(*args, **kwargs): |
| if materialize_futures: |
| args, kwargs = _materialize_futures(*args, **kwargs) |
| return func(*args, **kwargs) |
| |
| attrs = {"dispatch_mode": dispatch_mode, "execute_mode": execute_mode, "blocking": blocking} |
| setattr(inner, MAGIC_ATTR, attrs) |
| return inner |
| |
| return decorator |
| |
| As the code shows, values of ``dispatch_mode``, ``execute_mode`` and |
| ``blocking`` is attached the ``generate_sequences`` method. |
| |
| -------------- |
|
|
| Step 2: Binding During Initialization |
| ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
|
| These attached attributes are extracted and utilized when |
| ``ActorRolloutRefWorker``, wrapped in a ``RayClassWithArgs``, is passed |
| into a ``RayWorkerGroup``. |
|
|
| **Source:** |
| `main_generation.py <https://github.com/volcengine/verl/blob/4ae9a0fdab229f75f080e9478807783ed4c97154/verl/trainer/main_generation.py#L82>`__ |
|
|
| .. code:: python |
|
|
| ray_cls_with_init = RayClassWithInitArgs(cls=ray.remote(ActorRolloutRefWorker), config=config, role="rollout") |
| resource_pool = RayResourcePool(process_on_nodes=[config.trainer.n_gpus_per_node] * config.trainer.nnodes) |
| wg = RayWorkerGroup(resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init) |
|
|
| During the |
| `initialization <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/single_controller/ray/base.py#L184>`__ |
| of ``RayWorkerGroup``, two key steps occur: |
|
|
| 1. Worker instances (Ray actors) are created: |
| `RayWorkerGroup._init_with_resource_pool <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/single_controller/ray/base.py#L211>`__ |
| 2. Methods decorated with ``@register`` are bound to ``RayWorkerGroup``: |
| `RayWorkerGroup._bind_worker_method <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/single_controller/ray/base.py#L214>`__ |
|
|
| .. figure:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/worker_group_init.png?raw=true |
| :alt: initialization_and_binding_of_worker_group |
|
|
| initialization_and_binding_of_worker_group |
|
|
| The binding procedure is the heart of ``verl.single_controller``. |
|
|
| **Key function:** |
| `WorkerGroup._bind_worker_method <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/single_controller/base/worker_group.py#L143>`__ |
|
|
| .. code:: python |
|
|
| def _bind_worker_method(self, user_defined_cls, func_generator): |
| ... |
| for method_name in dir(user_defined_cls): |
| try: |
| method = getattr(user_defined_cls, method_name) |
| assert callable(method) |
| except Exception: |
| continue # Skip properties |
| <<<to be continue 1>>> |
|
|
| When a method has the ``MAGIC_ATTR``, the attributes set by |
| ``@register`` are extracted: |
|
|
| .. code:: python |
|
|
| <<<continue 1>>> |
| if hasattr(method, MAGIC_ATTR): |
| attribute = getattr(method, MAGIC_ATTR) |
| dispatch_mode = attribute["dispatch_mode"] |
| execute_mode = attribute["execute_mode"] |
| blocking = attribute["blocking"] |
|
|
| <<<to be continue 2>>> |
|
|
| As show in the flow chart above, these attributes are fed into |
| ``func_generator``. However, ``func_generator`` takes ``method_name``, |
| ``dispatch_fn``, ``collect_fn``, ``execute_fn``, ``blocking``. We need |
| to find the corresponding ``dispatch_fn`` and ``collect_fn`` associated |
| with the ``dispatch_mode`` (``DP_COMPUTE_PROTO``) from |
| `DISPATCH_MODE_FN_REGISTRY <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/single_controller/base/decorator.py#L387>`__: |
|
|
| .. code:: python3 |
|
|
| DISPATCH_MODE_FN_REGISTRY = { |
| Dispatch.ONE_TO_ALL: { |
| "dispatch_fn": dispatch_one_to_all, |
| "collect_fn": collect_all_to_all, |
| }, |
| ... |
| Dispatch.DP_COMPUTE_PROTO: { |
| "dispatch_fn": dispatch_dp_compute_data_proto, |
| "collect_fn": collect_dp_compute_data_proto, |
| }, |
| ... |
| } |
|
|
| Similarly, the ``execute_fn`` is selected by ``execute_mode`` and |
| extracted by: |
|
|
| .. code:: python |
|
|
| <<<continue 2>>> |
| # get execute_fn_name |
| execute_mode = get_predefined_execute_fn(execute_mode=execute_mode) |
| wg_execute_fn_name = execute_mode["execute_fn_name"] |
|
|
| # get execute_fn from string |
| try: |
| execute_fn = getattr(self, wg_execute_fn_name) |
| assert callable(execute_fn), "execute_fn must be callable" |
| except Exception: |
| print(f"execute_fn {wg_execute_fn_name} is invalid") |
| raise |
| <<<to be continue 3>>> |
|
|
| In this ``generate_sequences`` cases: - |
| ``dispatch_mode = Dispatch.DP_COMPUTE_PROTO`` - |
| ``dispatch_fn = dispatch_dp_compute_data_proto`` - |
| ``collect_fn = collect_dp_compute_data_proto`` - |
| ``execute_fn = RayWorkerGroup.execute_all`` |
|
|
| ONE_TO_ALL v.s. DP_COMPUTE_PROTO |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
|
|
| ``dispatch_mode`` is associated with a ``dispatch_fn`` and a |
| ``collect_fn``. As the name implies, ``dispatch_fn`` processes the input |
| arguments in ``WorkerGroup`` and generate a batch (list) of input |
| arguments, each of which will be fed into a worker attached to the |
| ``WorkerGroup``. |
|
|
| ``dispatch_fn`` of ``ONE_TO_ALL`` is |
| `dispatch_one_to_all <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/single_controller/base/decorator.py#L119>`__, |
| which just duplicates all the input arguments into N replicas, where N |
| equals the number of Workers attached to the ``worker_group``: |
| |
| .. code:: python |
|
|
| def dispatch_one_to_all(worker_group, *args, **kwargs): |
| args = tuple([arg] * worker_group.world_size for arg in args) |
| kwargs = {k: [v] * worker_group.world_size for k, v in kwargs.items()} |
| return args, kwargs |
|
|
| ``dispatch_fn`` of ``DP_COMPUTE_PROTO`` is |
| `dispatch_dp_compute_data_proto <https://github.com/volcengine/verl/blob/c59ab2f4788f9a910836a9f2f53dcdb62dfa314e/verl/single_controller/base/decorator.py#L350>`__, |
| which uses ``DataProto.chunk`` to split a large ``DataProto`` into N |
| smaller ``DataProto``, where N equals the world_size (number of the |
| workers) of the ``worker_group``: |
| |
| .. code:: python |
|
|
| def dispatch_dp_compute_data_proto(worker_group, *args, **kwargs): |
| from verl.single_controller.base.worker_group import WorkerGroup |
|
|
| assert isinstance(worker_group, WorkerGroup) |
| # Note: enable auto padding for dp compute DatapProto |
| splitted_args, splitted_kwargs = _split_args_kwargs_data_proto_with_auto_padding( |
| worker_group.world_size, |
| *args, |
| **kwargs, |
| ) |
| return splitted_args, splitted_kwargs |
|
|
| The ``collect_fn`` follows the same pattern and process a batch (list) |
| of returned value from all workers of a ``WorkerGroup`` and merge it |
| into a list as ``collect_all_to_all`` does or a large ``DataProto`` as |
| ``collect_dp_compute_data_proto`` does. |
|
|
| Finally, a new method is dynamically generated using ``func_generator`` |
| and added to the ``WorkerGroup`` instance: |
|
|
| .. code:: python |
|
|
| <<<continue 3>>> |
| # bind a new method to the RayWorkerGroup |
| func = func_generator( |
| self, |
| method_name, |
| dispatch_fn=dispatch_fn, |
| collect_fn=collect_fn, |
| execute_fn=execute_fn, |
| blocking=blocking, |
| ) |
|
|
| try: |
| setattr(self, method_name, func) |
| method_names.append(method_name) |
| except Exception as e: |
| raise ValueError(f"Fail to set method_name {method_name}") from e |
|
|
| This makes the method invocable via the ``WorkerGroup`` interface. |
|
|
| -------------- |
| |
| Step 3: Call Chain |
| ~~~~~~~~~~~~~~~~~~ |
| |
| All the machinery above ensures that distributed calls feel identical to |
| single-process ones. In the original single-process script, the code |
| looks like: |
| |
| .. code:: python |
| |
| rollout = Rollout() |
| rollout.generate_sequences(batch) |
| |
| With ``verl``, the multiprocess program becomes: |
| |
| .. code:: python |
| |
| rollout = RayWorkerGroup(resource_pool=[4], RayClassWithArgs(Rollout)) |
| rollout.generate_sequences(batch) |
| |
| .. figure:: https://github.com/eric-haibin-lin/verl-community/blob/main/docs/call_generate_sequences.png?raw=true |
| :alt: call_chain_of_generate_sequences |
| |
| call_chain_of_generate_sequences |
| |
| Behind this simple call: - ``dispatch_fn`` splits input across workers - |
| ``execute_fn`` performs the actual remote invocation - ``collect_fn`` |
| gathers the results |
| |
| All of this is abstracted away, enabling developers to write distributed |
| code with minimal changes to their existing logic. |
| |
| -------------- |
|
|
| Beyond RL Post-Training: Generalizing ``verl.single_controller`` |
| ---------------------------------------------------------------- |
|
|
| The ``verl.single_controller`` module generalizes well beyond |
| reinforcement learning. It provides a clean abstraction to batch-process |
| remote method calls, with automatic input/output handling. |
|
|
| By minimizing the gap between single-process and multi-process scripts, |
| ``verl.single_controller`` opens the door to distributed computing in |
| broader domains — not limited to RL post-training. |
|
|
| We hope this design inspires more examples and extensions from the |
| community. |
|
|