lvwerra HF staff commited on
Commit
e906818
·
verified ·
1 Parent(s): 1da7c70
assets/images/a0_all_gather.gif ADDED
assets/images/a0_barrier.png ADDED

Git LFS Details

  • SHA256: eaac3499c92dc9b38b3984cf8ac9e23fdbcb79931fa367f62bbe564eb07bcb1a
  • Pointer size: 130 Bytes
  • Size of remote file: 61 kB
assets/images/a0_broadcast.png ADDED

Git LFS Details

  • SHA256: ddeda70dbd64341c246a1a8027dc660103ba8d865ed14e778add1aec312f641c
  • Pointer size: 130 Bytes
  • Size of remote file: 82.4 kB
assets/images/a0_gather_allgather.png ADDED

Git LFS Details

  • SHA256: b188c5b29bff91b15716f2c7f674fb58a1d032790c0f58a0d95cb62c552e5da1
  • Pointer size: 131 Bytes
  • Size of remote file: 124 kB
assets/images/a0_general.png ADDED

Git LFS Details

  • SHA256: 62de1730ba9147c297661a26525ec080046738589733b5591279699a830a00d2
  • Pointer size: 130 Bytes
  • Size of remote file: 71.2 kB
assets/images/a0_reduce_allreduce.png ADDED

Git LFS Details

  • SHA256: 17db77e4f49fdbc3667055fbaf6489723a72a2a28abde3fad2a399de64ab1046
  • Pointer size: 131 Bytes
  • Size of remote file: 164 kB
assets/images/a0_reduce_scatter.gif ADDED
assets/images/a0_scatter_reducescatter.png ADDED

Git LFS Details

  • SHA256: 30f82f3f320968ccf830b5148c7a780d102a3e2c124d3a3654ac546887532b09
  • Pointer size: 131 Bytes
  • Size of remote file: 140 kB
dist/assets/images/a0_all_gather.gif ADDED
dist/assets/images/a0_barrier.png ADDED

Git LFS Details

  • SHA256: eaac3499c92dc9b38b3984cf8ac9e23fdbcb79931fa367f62bbe564eb07bcb1a
  • Pointer size: 130 Bytes
  • Size of remote file: 61 kB
dist/assets/images/a0_broadcast.png ADDED

Git LFS Details

  • SHA256: ddeda70dbd64341c246a1a8027dc660103ba8d865ed14e778add1aec312f641c
  • Pointer size: 130 Bytes
  • Size of remote file: 82.4 kB
dist/assets/images/a0_gather_allgather.png ADDED

Git LFS Details

  • SHA256: b188c5b29bff91b15716f2c7f674fb58a1d032790c0f58a0d95cb62c552e5da1
  • Pointer size: 131 Bytes
  • Size of remote file: 124 kB
dist/assets/images/a0_general.png ADDED

Git LFS Details

  • SHA256: 62de1730ba9147c297661a26525ec080046738589733b5591279699a830a00d2
  • Pointer size: 130 Bytes
  • Size of remote file: 71.2 kB
dist/assets/images/a0_reduce_allreduce.png ADDED

Git LFS Details

  • SHA256: 17db77e4f49fdbc3667055fbaf6489723a72a2a28abde3fad2a399de64ab1046
  • Pointer size: 131 Bytes
  • Size of remote file: 164 kB
dist/assets/images/a0_reduce_scatter.gif ADDED
dist/assets/images/a0_scatter_reducescatter.png ADDED

Git LFS Details

  • SHA256: 30f82f3f320968ccf830b5148c7a780d102a3e2c124d3a3654ac546887532b09
  • Pointer size: 131 Bytes
  • Size of remote file: 140 kB
dist/index.html CHANGED
@@ -2693,22 +2693,384 @@
2693
 
2694
 
2695
  <h2>Appendix</h2>
2696
-
2697
  <h3>A0: Parallel Programming Crash Course</h3>
2698
 
 
 
 
 
 
 
 
 
 
 
 
 
2699
  <h4>Broadcast</h4>
2700
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2701
  <h4>Reduce & AllReduce</h4>
2702
 
2703
- <h4>A quick focus on Ring AllReduce</h4>
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2704
 
 
 
 
 
 
 
 
 
 
 
 
 
2705
  <h4>Gather & AllGather </h4>
2706
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2707
  <h4>Scatter & ReduceScatter</h4>
2708
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2709
  <h4>Barrier</h4>
2710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2711
  <h4>NCCL: NVIDIA Collective Communications Library</h4>
 
 
 
 
 
 
 
 
 
 
 
 
 
2712
 
2713
  <h3>A1: Distributed Training Profiling</h3>
2714
 
 
2693
 
2694
 
2695
  <h2>Appendix</h2>
2696
+
2697
  <h3>A0: Parallel Programming Crash Course</h3>
2698
 
2699
+
2700
+ <p>Throughout the blogpost we scale LLM training from one to hundreds of GPUs. This will require the communication and synchronization of weights, gradients, and data between all the machines. There’s a set of distributed patterns to achieve exactly that called <strong><em>collective operations</em></strong>. In this section we’ll do a small crash course of all the operations like <em>Broadcast, AllReduce, Scatter</em> and more. Let’s dive in!</p>
2701
+
2702
+ <p>The general setup is that we have a number of independent nodes which could be CPU cores, GPUs, or compute nodes. Each performs some computation and then we want to communicate the result or parts of it to the other nodes for the next computation step (t+1).</p>
2703
+
2704
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_general.png" style="width: 400px" /></p>
2705
+
2706
+
2707
+ <p>Maybe we need to send the result from one node to all other nodes, or we need to sum all the intermediate results from each node to report the overall result. Usually, there is one node with an elevated status that plays a central role, here denoted with <code>root</code> that is the target or source of some operations. Let’s start with one of the simplest primitives: a broadcast operation.</p>
2708
+
2709
+
2710
+
2711
  <h4>Broadcast</h4>
2712
 
2713
+ <p>A very common pattern is that you have some data on Node 1 and you want to share it with all the other nodes so they can do some computation with the data. The broadcast operation does just that:</p>
2714
+
2715
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_broadcast.png" style="width: 400px" /></p>
2716
+
2717
+
2718
+ <p>Collective operations are natively provided by PyTorch so we can easily write a small example that demonstrates how broadcasting works. We first need to initialize a process group with <code>dist.initi_process_group</code> which sets up the communication backend (we’ll talk about NCCL later), it determines how many workers (aka nodes) exists and assigns a rank to each one (which we can get with <code>dist.get_rank</code>). Finally, it establishes a connection between the workers.</p>
2719
+
2720
+ <p>To showcase the <code>dist.broadcast</code> operation, let's create a tensor with non-zero values on <code>rank=0</code> and tensors full of zeros on the other workers. We then distribute the <code>rank=0</code> tensor to all other ranks with <code>dist.broadcast(tensor, src=0)</code> :</p>
2721
+
2722
+ <d-code block language="python">
2723
+ import torch
2724
+ import torch.distributed as dist
2725
+
2726
+ def init_process():
2727
+ dist.init_process_group(backend='nccl')
2728
+ torch.cuda.set_device(dist.get_rank())
2729
+
2730
+ def example_broadcast():
2731
+ if dist.get_rank() == 0:
2732
+ tensor = torch.tensor([1, 2, 3, 4, 5], dtype=torch.float32).cuda()
2733
+ else:
2734
+ tensor = torch.zeros(5, dtype=torch.float32).cuda()
2735
+ print(f"Before broadcast on rank {dist.get_rank()}: {tensor}")
2736
+ dist.broadcast(tensor, src=0)
2737
+ print(f"After broadcast on rank {dist.get_rank()}: {tensor}")
2738
+
2739
+ init_process()
2740
+ example_broadcats()
2741
+ </d-code>
2742
+
2743
+
2744
+ <p>You can run the above script with <code>torchrun --nproc_per_node=3 dist_op.py</code> (you’ll need 3 GPUs for this or change <code>nproc_per_node</code> accordingly) and you should see the following output:</p>
2745
+
2746
+ <d-code block language="python">
2747
+ Before broadcast on rank 0: tensor([1., 2., 3., 4., 5.], device='cuda:0')
2748
+ Before broadcast on rank 1: tensor([0., 0., 0., 0., 0.], device='cuda:1')
2749
+ Before broadcast on rank 2: tensor([0., 0., 0., 0., 0.], device='cuda:2')
2750
+
2751
+ After broadcast on rank 0: tensor([1., 2., 3., 4., 5.], device='cuda:0')
2752
+ After broadcast on rank 1: tensor([1., 2., 3., 4., 5.], device='cuda:1')
2753
+ After broadcast on rank 2: tensor([1., 2., 3., 4., 5.], device='cuda:2')
2754
+ </d-code>
2755
+
2756
+ <p>Great, seems like it works as expected. Note that the rank messages can be printed out of order as we have no control over which print statement is executed first (we ordered them here for readability). Now let’s move on to the Reduce and AllReduce patterns! </p>
2757
+
2758
+
2759
+
2760
  <h4>Reduce & AllReduce</h4>
2761
 
2762
+ <p>Reduce patterns are among the most fundamental patterns in distributed data processing. The idea is that you want to combine the data present on each node through a function <code>f()</code> which can be for instance summation or averaging. In the Reduce paradigm the result is sent to the root node only, whereas in the AllReduce case the result is broadcasted to all nodes:</p>
2763
+
2764
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_reduce_allreduce.png" style="width: 1000px" /></p>
2765
+
2766
+
2767
+ <p>Of course no magic “free flying” node that can perform this operation and generally each node does a partial computation in a ring or tree structure of the nodes. Here is a simple example: let’s say we need to compute a sum of numbers on each nodes and our nodes are connected in a ring pattern. The first node sends its number to a neighbour which adds its number to the received number before forwarding it to the next neighbour. At the end of a round along the ring of nodes, the first node will receive the total sum.</p>
2768
+
2769
+ <p>Here’s the code to run a simple Reduce operation summing the tensors, we specify the operation to use with <code>op=dist.ReduceOp.SUM</code> (you can find more information on the supported operations in the <a href="https://pytorch.org/docs/stable/distributed.html#torch.distributed.ReduceOp">Pytorch docs</a>):</p>
2770
+
2771
+ <d-code block language="python">
2772
+ def example_reduce():
2773
+ tensor = torch.tensor([dist.get_rank() + 1] * 5, dtype=torch.float32).cuda()
2774
+ print(f"Before reduce on rank {dist.get_rank()}: {tensor}")
2775
+ dist.reduce(tensor, dst=0, op=dist.ReduceOp.SUM)
2776
+ print(f"After reduce on rank {rank}: {tensor}")
2777
+
2778
+ init_process()
2779
+ example_reduce()
2780
+ </d-code>
2781
+
2782
+ <p>Note that in the Reduce operation only the tensor on the <code>dst</code> node is updated:</p>
2783
+
2784
+ <d-code block language="python">
2785
+ Before reduce on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2786
+ Before reduce on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2787
+ Before reduce on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2788
+
2789
+ After reduce on rank 0: tensor([6., 6., 6., 6., 6.], device='cuda:0')
2790
+ After reduce on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2791
+ After reduce on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2792
+ </d-code>
2793
+
2794
+ <p>Similarly we can perform an AllReduce (we don’t need to specify a destination in this case):</p>
2795
+
2796
+ <d-code block language="python">
2797
+ def example_all_reduce():
2798
+ tensor = torch.tensor([dist.get_rank() + 1] * 5, dtype=torch.float32).cuda()
2799
+ print(f"Before all_reduce on rank {dist.get_rank()}: {tensor}")
2800
+ dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
2801
+ print(f"After all_reduce on rank {dist.get_rank()}: {tensor}")
2802
+
2803
+ init_process()
2804
+ example_all_reduce()
2805
+ </d-code>
2806
+
2807
+ <p>In this case the result is available on all nodes:</p>
2808
 
2809
+ <d-code block language="python">
2810
+ Before all_reduce on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2811
+ Before all_reduce on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2812
+ Before all_reduce on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2813
+
2814
+ After all_reduce on rank 0: tensor([6., 6., 6., 6., 6.], device='cuda:0')
2815
+ After all_reduce on rank 1: tensor([6., 6., 6., 6., 6.], device='cuda:1')
2816
+ After all_reduce on rank 2: tensor([6., 6., 6., 6., 6.], device='cuda:2')
2817
+ </d-code>
2818
+
2819
+ <p>Now let’s turn to our next distributed communication operation. In many real cases, each node individually perform many complex computations and we need to share the final results among nodes. Gather and AllGather are the operations we want to use in this case. Let’s take a look!</p>
2820
+
2821
  <h4>Gather & AllGather </h4>
2822
 
2823
+ <p>Gather and AllGather are quite similar to the Broadcast in that they allow distributing data among node without modification. The main difference to Broadcast is that there is not one value we need to share from one node to all other nodes but each node has an individual chunk of data that we want to either gather all data on one node (in case of Gather) or gather all data on all nodes (in the case of AllGather). A picture being worth 1000 words, let’s take a look:</p>
2824
+
2825
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_gather_allgather.png" style="width: 1000px" /></p>
2826
+
2827
+ <p>Note that the dashed lines indicate that some data actually doesn’t move at all (since it’s already present on the node).</p>
2828
+
2829
+ <p>In the case of the gather operation we need to prepare a container objects where the gathered tensors can be stored in this example the <code>gather_list</code>:</p>
2830
+
2831
+ <d-code block language="python">
2832
+ def example_gather():
2833
+ tensor = torch.tensor([dist.get_rank() + 1] * 5, dtype=torch.float32).cuda()
2834
+ if dist.get_rank() == 0:
2835
+ gather_list = [
2836
+ torch.zeros(5, dtype=torch.float32).cuda()
2837
+ for _ in range(dist.get_world_size())
2838
+ ]
2839
+ else:
2840
+ gather_list = None
2841
+ print(f"Before gather on rank {dist.get_rank()}: {tensor}")
2842
+ dist.gather(tensor, gather_list, dst=0)
2843
+ if dist.get_rank() == 0:
2844
+ print(f"After gather on rank 0: {gather_list}")
2845
+
2846
+ init_process()
2847
+ example_gather()
2848
+ </d-code>
2849
+
2850
+ <p>And we see that the `gather_list` indeed contains the tensors of all ranks:</p>
2851
+
2852
+ <d-code block language="python">
2853
+ Before gather on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2854
+ Before gather on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2855
+ Before gather on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2856
+
2857
+ After gather on rank 0: [tensor([1., 1., 1., 1., 1.], device='cuda:0'),
2858
+ tensor([2., 2., 2., 2., 2.], device='cuda:0'),
2859
+ tensor([3., 3., 3., 3., 3.], device='cuda:0')]
2860
+ </d-code>
2861
+
2862
+ <p>The only thing we need to change for the AllGather example is that every node will need a placeholder for the results:</p>
2863
+
2864
+ <d-code block language="python">
2865
+ def example_all_gather():
2866
+ tensor = torch.tensor([dist.get_rank() + 1] * 5, dtype=torch.float32).cuda()
2867
+ gather_list = [
2868
+ torch.zeros(5, dtype=torch.float32).cuda()
2869
+ for _ in range(dist.get_world_size())
2870
+ ]
2871
+ print(f"Before all_gather on rank {dist.get_rank()}: {tensor}")
2872
+ dist.all_gather(gather_list, tensor)
2873
+ print(f"After all_gather on rank {dist.get_rank()}: {gather_list}")
2874
+
2875
+ init_process()
2876
+ example_all_gather()
2877
+ </d-code>
2878
+
2879
+ <p>And indeed we can see that now each node has all the data:</p>
2880
+
2881
+ <d-code block language="python">
2882
+ Before all_gather on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2883
+ Before all_gather on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2884
+ Before all_gather on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2885
+
2886
+ After all_gather on rank 0: [tensor([1., 1., 1., 1., 1.], device='cuda:0'),
2887
+ tensor([2., 2., 2., 2., 2.], device='cuda:0'),
2888
+ tensor([3., 3., 3., 3., 3.], device='cuda:0')]
2889
+ After all_gather on rank 1: [tensor([1., 1., 1., 1., 1.], device='cuda:1'),
2890
+ tensor([2., 2., 2., 2., 2.], device='cuda:0'),
2891
+ tensor([3., 3., 3., 3., 3.], device='cuda:0')]
2892
+ After all_gather on rank 2: [tensor([1., 1., 1., 1., 1.], device='cuda:2'),
2893
+ tensor([2., 2., 2., 2., 2.], device='cuda:2'),
2894
+ tensor([3., 3., 3., 3., 3.], device='cuda:2')]
2895
+ </d-code>
2896
+
2897
+ <p>Now what about the inverse of a gather? In this case we would have all the data on one node and want to distribute/slice it among node, possibly with some intermediate processing? We can use the Scatter, or in the case of an operation in between a Reduce Scatter pattern:</p>
2898
+
2899
  <h4>Scatter & ReduceScatter</h4>
2900
 
2901
+ <p>As the name subtly suggests, the goal of the Scatter operation is to take data on one node and distribute slices of it to all other nodes. It’s thus different from the Broadcast operation which copy data without slicing and it’s the logical the inverse of the Gather operation.</p>
2902
+
2903
+ <p>The ReduceScatter pattern is slightly more complex: imagine you apply an operation like in the Reduce case but instead of moving the result to just one node we also distribute it evenly to all nodes:</p>
2904
+
2905
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_scatter_reducescatter.png" style="width: 1000px" /></p>
2906
+
2907
+ <p>The Scatter operation is written in code as the opposite of the Gather: instead of preparing a list of tensors as target we prepare the source data as a list of tensors we want to distribute. We also need to specify the <code>src</code>:</p>
2908
+
2909
+ <d-code block language="python">
2910
+ def example_scatter():
2911
+ if dist.get_rank() == 0:
2912
+ scatter_list = [
2913
+ torch.tensor([i + 1] * 5, dtype=torch.float32).cuda()
2914
+ for i in range(dist.get_world_size())
2915
+ ]
2916
+ print(f"Rank 0: Tensor to scatter: {scatter_list}")
2917
+ else:
2918
+ scatter_list = None
2919
+ tensor = torch.zeros(5, dtype=torch.float32).cuda()
2920
+ print(f"Before scatter on rank {dist.get_rank()}: {tensor}")
2921
+ dist.scatter(tensor, scatter_list, src=0)
2922
+ print(f"After scatter on rank {dist.get_rank()}: {tensor}")
2923
+
2924
+ init_process()
2925
+ example_scatter()
2926
+ </d-code>
2927
+
2928
+ <p>As a result we can see how the empty tensors got filled with the contents of the <code>scatter_list</code></p>
2929
+
2930
+ <d-code block language="python">
2931
+ Rank 0: Tensor to scatter: [tensor([1., 1., 1., 1., 1.], device='cuda:0'),
2932
+ tensor([2., 2., 2., 2., 2.], device='cuda:0'),
2933
+ tensor([3., 3., 3., 3., 3.], device='cuda:0')]
2934
+ Before scatter on rank 0: tensor([0., 0., 0., 0., 0.], device='cuda:0')
2935
+ Before scatter on rank 1: tensor([0., 0., 0., 0., 0.], device='cuda:1')
2936
+ Before scatter on rank 2: tensor([0., 0., 0., 0., 0.], device='cuda:2')
2937
+
2938
+ After scatter on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2939
+ After scatter on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2940
+ After scatter on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2941
+ </d-code>
2942
+
2943
+ <p>Let’s create more interesting data to demonstrate the ReduceScatter logic: on each node we create a list of 2-elements vector on each node with a power exponent and an offset function of the node rank (it’s a bit hard to imagine so just look below for an example): </p>
2944
+
2945
+ <d-code block language="python">
2946
+ def example_reduce_scatter():
2947
+ rank = dist.get_rank()
2948
+ world_size = dist.get_world_size()
2949
+ input_tensor = [
2950
+ torch.tensor([(rank + 1) * i for i in range(1, 3)], dtype=torch.float32).cuda()**(j+1)
2951
+ for j in range(world_size)
2952
+ ]
2953
+ output_tensor = torch.zeros(2, dtype=torch.float32).cuda()
2954
+ print(f"Before ReduceScatter on rank {rank}: {input_tensor}")
2955
+ dist.reduce_scatter(output_tensor, input_tensor, op=dist.ReduceOp.SUM)
2956
+ print(f"After ReduceScatter on rank {rank}: {output_tensor}")
2957
+
2958
+ init_process()
2959
+ example_reduce_scatter()
2960
+ </d-code>
2961
+
2962
+ <p>Let’s print the pattern of data that we created. We also immediately see the ReduceScatter pattern: the first rank received the sum of the first tensor from each node, and the second rank contains the sum of the second tensor on each node and so on:</p>
2963
+
2964
+ <d-code block language="python">
2965
+ Before ReduceScatter on rank 0: [tensor([1., 2.], device='cuda:0'),
2966
+ tensor([1., 4.], device='cuda:0'),
2967
+ tensor([1., 8.], device='cuda:0')]
2968
+ Before ReduceScatter on rank 1: [tensor([2., 4.], device='cuda:1'),
2969
+ tensor([ 4., 16.], device='cuda:1'),
2970
+ tensor([ 8., 64.], device='cuda:1')]
2971
+ Before ReduceScatter on rank 2: [tensor([3., 6.], device='cuda:2'),
2972
+ tensor([ 9., 36.], device='cuda:2'),
2973
+ tensor([ 27., 216.], device='cuda:2')]
2974
+
2975
+ After ReduceScatter on rank 0: tensor([ 6., 12.], device='cuda:0')
2976
+ After ReduceScatter on rank 1: tensor([14., 56.], device='cuda:1')
2977
+ After ReduceScatter on rank 2: tensor([ 36., 288.], device='cuda:2')
2978
+ </d-code>
2979
+
2980
+
2981
+ <p>Let's have a quick look at a common implementation of AllReduce that uses ReduceScatter and AllGather: Ring AllReduce.</p>
2982
+
2983
+ <h4>A quick focus on Ring AllReduce</h4>
2984
+
2985
+ <p><strong><em>Ring AllReduce</em></strong> is one specific implementation of AllReduce, optimized for scalability. Rather than all devices communicating with each other directly, which could create communication bottlenecks, Ring All-Reduce can be broken down into two key steps: ReduceScatter and AllGather. Here's how it works:</p>
2986
+
2987
+ <ol>
2988
+ <li><strong>ReduceScatter</strong></li>
2989
+ <ul>
2990
+ <li>Each device splits its data (e.g., gradients) into chunks and sends one chunk to its neighbour. Simultaneously, each device receives a chunk from its other neighbour.</li>
2991
+ <li>As each device receives a chunk, it adds (reduces) its corresponding chunk to the received one.</li>
2992
+ <li>This process continues around the ring until each device holds a partially reduced chunk, representing a sum of the gradients across all devices for that chunk.</li>
2993
+ </ul>
2994
+ <li><strong>AllGather</strong></li>
2995
+ <ul>
2996
+ <li>Now, each device needs to collect the fully reduced chunks from other devices.</li>
2997
+ <li>The devices start sending their reduced chunks to neighbours.</li>
2998
+ <li>Each device forwards the chunks it receives until every device has all the fully reduced chunks, giving each device the complete, summed-up gradient.</li>
2999
+ </ul>
3000
+ </ol>
3001
+
3002
+ <p>Let’s illustrate this with the following gifs, where we have 5 GPUs, each with a tensor of length 5. The first animation shows the ReduceScatter step, where, at the end, each GPU receives the reduced results for a specific chunk of data (orange rectangle).</p>
3003
+
3004
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_reduce_scatter.gif" style="width: 400px" /></p>
3005
+
3006
+ <p>The next animation shows the AllGather step, where, at the end, each GPU obtains the full results of the AllReduce operation:</p>
3007
+
3008
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_all_gather.gif" style="width: 400px" /></p>
3009
+
3010
+ <p>You may have noticed that each of the <d-math>N</d-math> GPUs sends and receives values <d-math>N-1</d-math> times during both the reduce-scatter and all-gather steps. Each GPU sends <d-math>\frac{K}{N}</d-math> values per transfer, where <d-math>K</d-math> is the total number of values in the array being summed across the GPUs. Therefore, the total amount of data transferred to and from each GPU is <d-math>2 \times (N-1) \times \frac{K}{N}</d-math>. When <d-math>N</d-math> (the number of GPUs) is large, the total amount of data transferred to and from each GPU is approximately <d-math>2 \times K</d-math>, where <d-math>K</d-math> is the total number of parameters.</p>
3011
+
3012
+
3013
+ <p><strong>There are two key things to keep in mind for AllReduce:</strong></p>
3014
+ <ol>
3015
+ <li>The communication cost for AllReduce is approximately <d-math>2xK</d-math> when <d-math>N</d-math> (the number of GPUs) is large.</li>
3016
+ <li>An AllReduce operation can be broken down into a reduce-scatter followed by an all-gather. The communication cost for these two operations is half that of the AllReduce, which is approximately <d-math>K</d-math>.</li>
3017
+ </ol>
3018
+
3019
+ <p>As we can see this implementation can make efficient use of even a limited bandwidth between nodes.</p>
3020
+
3021
+ <p>We now have seen the main building block of distributed operations but before we see them in action let’s have a look at a special operation used for synchronization: the Barrier.</p>
3022
+
3023
  <h4>Barrier</h4>
3024
 
3025
+ <p>The Barrier is a simple operation to synchronize all nodes. A barrier is not lifted until all nodes have reached it. Then only are they allowed to continue with further computations:</p>
3026
+
3027
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_barrier.png" style="width: 400px" /></p>
3028
+
3029
+ <p>We can easily simulate delayed nodes by setting up a different sleep time on each node and see how long it takes for all of them to pass the barrier:</p>
3030
+
3031
+ <d-code block language="python">
3032
+ def example_barrier():
3033
+ rank = dist.get_rank()
3034
+ t_start = time.time()
3035
+ print(f"Rank {rank} sleeps {rank} seconds.")
3036
+ time.sleep(rank) # Simulate different processing times
3037
+ dist.barrier()
3038
+ print(f"Rank {rank} after barrier time delta: {time.time()-t_start:.4f}")
3039
+
3040
+ init_process()
3041
+ example_barrier()
3042
+ </d-code>
3043
+
3044
+ <p>We can see that although the first rank didn’t sleep at all it also took it 2sec to pass the barrier:</p>
3045
+
3046
+ <d-code block language="python">
3047
+ Rank 0 sleeps 0 seconds.
3048
+ Rank 1 sleeps 1 seconds.
3049
+ Rank 2 sleeps 2 seconds.
3050
+
3051
+ Rank 0 after barrier time delta: 2.0025
3052
+ Rank 1 after barrier time delta: 2.0025
3053
+ Rank 2 after barrier time delta: 2.0024
3054
+ </d-code>
3055
+
3056
+ <p>We need to be careful with synchronizing all nodes like this, as this defeat the purpose of parallel independent operations and might thus slow down the whole processing. In many situations it can be just fine if a fast node already starts processing the next job as the fast node could be slower in a next iteration therefore evening out the delay over the whole process.</p>
3057
+
3058
+ <p>Before turning to practical distributed training implementations, let’s first solve a mystery: what the heck is NCCL?</p>
3059
+
3060
  <h4>NCCL: NVIDIA Collective Communications Library</h4>
3061
+
3062
+ <p>When training large models on many GPUs we may sometimes strike gold but we will always encounter nickel (or NCCL 🥁)! What’s is that?</p>
3063
+
3064
+ <p>There are several libraries that implement collective communication and are support by PyTorch: there’s the classic <strong><em>MPI</em></strong> (Message Passing Interface), there’s <strong><em>Gloo</em></strong> by Meta, and finally there is `NCCL` (NVIDIA Collective Communications Library). They all provide similar functionality in terms of collective communication patterns but are optimized for different hardware setups; NCCL is designed to serve GPU-GPU communication efficiently while MPI and Gloo are setup for CPU-CPU or CPU-GPU communication. PyTorch provides a <a href="https://pytorch.org/docs/stable/distributed.html#which-backend-to-use">great guide</a> to decide which one to use:</p>
3065
+
3066
+ <ul>
3067
+ <li>GPU training: use NCCL</li>
3068
+ <li>CPU training: use Gloo</li>
3069
+ </ul>
3070
+
3071
+ <p>There are a few finer points in the decision tree that we leave to the reader to explore in the PyTorch guide referenced above.</p>
3072
+
3073
+ <p>Now that we covered the fundamental operations for distributed training and when you should should be ready to follow the blog post easily.</p>
3074
 
3075
  <h3>A1: Distributed Training Profiling</h3>
3076
 
src/index.html CHANGED
@@ -2693,22 +2693,384 @@
2693
 
2694
 
2695
  <h2>Appendix</h2>
2696
-
2697
  <h3>A0: Parallel Programming Crash Course</h3>
2698
 
 
 
 
 
 
 
 
 
 
 
 
 
2699
  <h4>Broadcast</h4>
2700
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2701
  <h4>Reduce & AllReduce</h4>
2702
 
2703
- <h4>A quick focus on Ring AllReduce</h4>
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2704
 
 
 
 
 
 
 
 
 
 
 
 
 
2705
  <h4>Gather & AllGather </h4>
2706
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2707
  <h4>Scatter & ReduceScatter</h4>
2708
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2709
  <h4>Barrier</h4>
2710
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2711
  <h4>NCCL: NVIDIA Collective Communications Library</h4>
 
 
 
 
 
 
 
 
 
 
 
 
 
2712
 
2713
  <h3>A1: Distributed Training Profiling</h3>
2714
 
 
2693
 
2694
 
2695
  <h2>Appendix</h2>
2696
+
2697
  <h3>A0: Parallel Programming Crash Course</h3>
2698
 
2699
+
2700
+ <p>Throughout the blogpost we scale LLM training from one to hundreds of GPUs. This will require the communication and synchronization of weights, gradients, and data between all the machines. There’s a set of distributed patterns to achieve exactly that called <strong><em>collective operations</em></strong>. In this section we’ll do a small crash course of all the operations like <em>Broadcast, AllReduce, Scatter</em> and more. Let’s dive in!</p>
2701
+
2702
+ <p>The general setup is that we have a number of independent nodes which could be CPU cores, GPUs, or compute nodes. Each performs some computation and then we want to communicate the result or parts of it to the other nodes for the next computation step (t+1).</p>
2703
+
2704
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_general.png" style="width: 400px" /></p>
2705
+
2706
+
2707
+ <p>Maybe we need to send the result from one node to all other nodes, or we need to sum all the intermediate results from each node to report the overall result. Usually, there is one node with an elevated status that plays a central role, here denoted with <code>root</code> that is the target or source of some operations. Let’s start with one of the simplest primitives: a broadcast operation.</p>
2708
+
2709
+
2710
+
2711
  <h4>Broadcast</h4>
2712
 
2713
+ <p>A very common pattern is that you have some data on Node 1 and you want to share it with all the other nodes so they can do some computation with the data. The broadcast operation does just that:</p>
2714
+
2715
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_broadcast.png" style="width: 400px" /></p>
2716
+
2717
+
2718
+ <p>Collective operations are natively provided by PyTorch so we can easily write a small example that demonstrates how broadcasting works. We first need to initialize a process group with <code>dist.initi_process_group</code> which sets up the communication backend (we’ll talk about NCCL later), it determines how many workers (aka nodes) exists and assigns a rank to each one (which we can get with <code>dist.get_rank</code>). Finally, it establishes a connection between the workers.</p>
2719
+
2720
+ <p>To showcase the <code>dist.broadcast</code> operation, let's create a tensor with non-zero values on <code>rank=0</code> and tensors full of zeros on the other workers. We then distribute the <code>rank=0</code> tensor to all other ranks with <code>dist.broadcast(tensor, src=0)</code> :</p>
2721
+
2722
+ <d-code block language="python">
2723
+ import torch
2724
+ import torch.distributed as dist
2725
+
2726
+ def init_process():
2727
+ dist.init_process_group(backend='nccl')
2728
+ torch.cuda.set_device(dist.get_rank())
2729
+
2730
+ def example_broadcast():
2731
+ if dist.get_rank() == 0:
2732
+ tensor = torch.tensor([1, 2, 3, 4, 5], dtype=torch.float32).cuda()
2733
+ else:
2734
+ tensor = torch.zeros(5, dtype=torch.float32).cuda()
2735
+ print(f"Before broadcast on rank {dist.get_rank()}: {tensor}")
2736
+ dist.broadcast(tensor, src=0)
2737
+ print(f"After broadcast on rank {dist.get_rank()}: {tensor}")
2738
+
2739
+ init_process()
2740
+ example_broadcats()
2741
+ </d-code>
2742
+
2743
+
2744
+ <p>You can run the above script with <code>torchrun --nproc_per_node=3 dist_op.py</code> (you’ll need 3 GPUs for this or change <code>nproc_per_node</code> accordingly) and you should see the following output:</p>
2745
+
2746
+ <d-code block language="python">
2747
+ Before broadcast on rank 0: tensor([1., 2., 3., 4., 5.], device='cuda:0')
2748
+ Before broadcast on rank 1: tensor([0., 0., 0., 0., 0.], device='cuda:1')
2749
+ Before broadcast on rank 2: tensor([0., 0., 0., 0., 0.], device='cuda:2')
2750
+
2751
+ After broadcast on rank 0: tensor([1., 2., 3., 4., 5.], device='cuda:0')
2752
+ After broadcast on rank 1: tensor([1., 2., 3., 4., 5.], device='cuda:1')
2753
+ After broadcast on rank 2: tensor([1., 2., 3., 4., 5.], device='cuda:2')
2754
+ </d-code>
2755
+
2756
+ <p>Great, seems like it works as expected. Note that the rank messages can be printed out of order as we have no control over which print statement is executed first (we ordered them here for readability). Now let’s move on to the Reduce and AllReduce patterns! </p>
2757
+
2758
+
2759
+
2760
  <h4>Reduce & AllReduce</h4>
2761
 
2762
+ <p>Reduce patterns are among the most fundamental patterns in distributed data processing. The idea is that you want to combine the data present on each node through a function <code>f()</code> which can be for instance summation or averaging. In the Reduce paradigm the result is sent to the root node only, whereas in the AllReduce case the result is broadcasted to all nodes:</p>
2763
+
2764
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_reduce_allreduce.png" style="width: 1000px" /></p>
2765
+
2766
+
2767
+ <p>Of course no magic “free flying” node that can perform this operation and generally each node does a partial computation in a ring or tree structure of the nodes. Here is a simple example: let’s say we need to compute a sum of numbers on each nodes and our nodes are connected in a ring pattern. The first node sends its number to a neighbour which adds its number to the received number before forwarding it to the next neighbour. At the end of a round along the ring of nodes, the first node will receive the total sum.</p>
2768
+
2769
+ <p>Here’s the code to run a simple Reduce operation summing the tensors, we specify the operation to use with <code>op=dist.ReduceOp.SUM</code> (you can find more information on the supported operations in the <a href="https://pytorch.org/docs/stable/distributed.html#torch.distributed.ReduceOp">Pytorch docs</a>):</p>
2770
+
2771
+ <d-code block language="python">
2772
+ def example_reduce():
2773
+ tensor = torch.tensor([dist.get_rank() + 1] * 5, dtype=torch.float32).cuda()
2774
+ print(f"Before reduce on rank {dist.get_rank()}: {tensor}")
2775
+ dist.reduce(tensor, dst=0, op=dist.ReduceOp.SUM)
2776
+ print(f"After reduce on rank {rank}: {tensor}")
2777
+
2778
+ init_process()
2779
+ example_reduce()
2780
+ </d-code>
2781
+
2782
+ <p>Note that in the Reduce operation only the tensor on the <code>dst</code> node is updated:</p>
2783
+
2784
+ <d-code block language="python">
2785
+ Before reduce on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2786
+ Before reduce on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2787
+ Before reduce on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2788
+
2789
+ After reduce on rank 0: tensor([6., 6., 6., 6., 6.], device='cuda:0')
2790
+ After reduce on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2791
+ After reduce on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2792
+ </d-code>
2793
+
2794
+ <p>Similarly we can perform an AllReduce (we don’t need to specify a destination in this case):</p>
2795
+
2796
+ <d-code block language="python">
2797
+ def example_all_reduce():
2798
+ tensor = torch.tensor([dist.get_rank() + 1] * 5, dtype=torch.float32).cuda()
2799
+ print(f"Before all_reduce on rank {dist.get_rank()}: {tensor}")
2800
+ dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
2801
+ print(f"After all_reduce on rank {dist.get_rank()}: {tensor}")
2802
+
2803
+ init_process()
2804
+ example_all_reduce()
2805
+ </d-code>
2806
+
2807
+ <p>In this case the result is available on all nodes:</p>
2808
 
2809
+ <d-code block language="python">
2810
+ Before all_reduce on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2811
+ Before all_reduce on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2812
+ Before all_reduce on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2813
+
2814
+ After all_reduce on rank 0: tensor([6., 6., 6., 6., 6.], device='cuda:0')
2815
+ After all_reduce on rank 1: tensor([6., 6., 6., 6., 6.], device='cuda:1')
2816
+ After all_reduce on rank 2: tensor([6., 6., 6., 6., 6.], device='cuda:2')
2817
+ </d-code>
2818
+
2819
+ <p>Now let’s turn to our next distributed communication operation. In many real cases, each node individually perform many complex computations and we need to share the final results among nodes. Gather and AllGather are the operations we want to use in this case. Let’s take a look!</p>
2820
+
2821
  <h4>Gather & AllGather </h4>
2822
 
2823
+ <p>Gather and AllGather are quite similar to the Broadcast in that they allow distributing data among node without modification. The main difference to Broadcast is that there is not one value we need to share from one node to all other nodes but each node has an individual chunk of data that we want to either gather all data on one node (in case of Gather) or gather all data on all nodes (in the case of AllGather). A picture being worth 1000 words, let’s take a look:</p>
2824
+
2825
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_gather_allgather.png" style="width: 1000px" /></p>
2826
+
2827
+ <p>Note that the dashed lines indicate that some data actually doesn’t move at all (since it’s already present on the node).</p>
2828
+
2829
+ <p>In the case of the gather operation we need to prepare a container objects where the gathered tensors can be stored in this example the <code>gather_list</code>:</p>
2830
+
2831
+ <d-code block language="python">
2832
+ def example_gather():
2833
+ tensor = torch.tensor([dist.get_rank() + 1] * 5, dtype=torch.float32).cuda()
2834
+ if dist.get_rank() == 0:
2835
+ gather_list = [
2836
+ torch.zeros(5, dtype=torch.float32).cuda()
2837
+ for _ in range(dist.get_world_size())
2838
+ ]
2839
+ else:
2840
+ gather_list = None
2841
+ print(f"Before gather on rank {dist.get_rank()}: {tensor}")
2842
+ dist.gather(tensor, gather_list, dst=0)
2843
+ if dist.get_rank() == 0:
2844
+ print(f"After gather on rank 0: {gather_list}")
2845
+
2846
+ init_process()
2847
+ example_gather()
2848
+ </d-code>
2849
+
2850
+ <p>And we see that the `gather_list` indeed contains the tensors of all ranks:</p>
2851
+
2852
+ <d-code block language="python">
2853
+ Before gather on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2854
+ Before gather on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2855
+ Before gather on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2856
+
2857
+ After gather on rank 0: [tensor([1., 1., 1., 1., 1.], device='cuda:0'),
2858
+ tensor([2., 2., 2., 2., 2.], device='cuda:0'),
2859
+ tensor([3., 3., 3., 3., 3.], device='cuda:0')]
2860
+ </d-code>
2861
+
2862
+ <p>The only thing we need to change for the AllGather example is that every node will need a placeholder for the results:</p>
2863
+
2864
+ <d-code block language="python">
2865
+ def example_all_gather():
2866
+ tensor = torch.tensor([dist.get_rank() + 1] * 5, dtype=torch.float32).cuda()
2867
+ gather_list = [
2868
+ torch.zeros(5, dtype=torch.float32).cuda()
2869
+ for _ in range(dist.get_world_size())
2870
+ ]
2871
+ print(f"Before all_gather on rank {dist.get_rank()}: {tensor}")
2872
+ dist.all_gather(gather_list, tensor)
2873
+ print(f"After all_gather on rank {dist.get_rank()}: {gather_list}")
2874
+
2875
+ init_process()
2876
+ example_all_gather()
2877
+ </d-code>
2878
+
2879
+ <p>And indeed we can see that now each node has all the data:</p>
2880
+
2881
+ <d-code block language="python">
2882
+ Before all_gather on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2883
+ Before all_gather on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2884
+ Before all_gather on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2885
+
2886
+ After all_gather on rank 0: [tensor([1., 1., 1., 1., 1.], device='cuda:0'),
2887
+ tensor([2., 2., 2., 2., 2.], device='cuda:0'),
2888
+ tensor([3., 3., 3., 3., 3.], device='cuda:0')]
2889
+ After all_gather on rank 1: [tensor([1., 1., 1., 1., 1.], device='cuda:1'),
2890
+ tensor([2., 2., 2., 2., 2.], device='cuda:0'),
2891
+ tensor([3., 3., 3., 3., 3.], device='cuda:0')]
2892
+ After all_gather on rank 2: [tensor([1., 1., 1., 1., 1.], device='cuda:2'),
2893
+ tensor([2., 2., 2., 2., 2.], device='cuda:2'),
2894
+ tensor([3., 3., 3., 3., 3.], device='cuda:2')]
2895
+ </d-code>
2896
+
2897
+ <p>Now what about the inverse of a gather? In this case we would have all the data on one node and want to distribute/slice it among node, possibly with some intermediate processing? We can use the Scatter, or in the case of an operation in between a Reduce Scatter pattern:</p>
2898
+
2899
  <h4>Scatter & ReduceScatter</h4>
2900
 
2901
+ <p>As the name subtly suggests, the goal of the Scatter operation is to take data on one node and distribute slices of it to all other nodes. It’s thus different from the Broadcast operation which copy data without slicing and it’s the logical the inverse of the Gather operation.</p>
2902
+
2903
+ <p>The ReduceScatter pattern is slightly more complex: imagine you apply an operation like in the Reduce case but instead of moving the result to just one node we also distribute it evenly to all nodes:</p>
2904
+
2905
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_scatter_reducescatter.png" style="width: 1000px" /></p>
2906
+
2907
+ <p>The Scatter operation is written in code as the opposite of the Gather: instead of preparing a list of tensors as target we prepare the source data as a list of tensors we want to distribute. We also need to specify the <code>src</code>:</p>
2908
+
2909
+ <d-code block language="python">
2910
+ def example_scatter():
2911
+ if dist.get_rank() == 0:
2912
+ scatter_list = [
2913
+ torch.tensor([i + 1] * 5, dtype=torch.float32).cuda()
2914
+ for i in range(dist.get_world_size())
2915
+ ]
2916
+ print(f"Rank 0: Tensor to scatter: {scatter_list}")
2917
+ else:
2918
+ scatter_list = None
2919
+ tensor = torch.zeros(5, dtype=torch.float32).cuda()
2920
+ print(f"Before scatter on rank {dist.get_rank()}: {tensor}")
2921
+ dist.scatter(tensor, scatter_list, src=0)
2922
+ print(f"After scatter on rank {dist.get_rank()}: {tensor}")
2923
+
2924
+ init_process()
2925
+ example_scatter()
2926
+ </d-code>
2927
+
2928
+ <p>As a result we can see how the empty tensors got filled with the contents of the <code>scatter_list</code></p>
2929
+
2930
+ <d-code block language="python">
2931
+ Rank 0: Tensor to scatter: [tensor([1., 1., 1., 1., 1.], device='cuda:0'),
2932
+ tensor([2., 2., 2., 2., 2.], device='cuda:0'),
2933
+ tensor([3., 3., 3., 3., 3.], device='cuda:0')]
2934
+ Before scatter on rank 0: tensor([0., 0., 0., 0., 0.], device='cuda:0')
2935
+ Before scatter on rank 1: tensor([0., 0., 0., 0., 0.], device='cuda:1')
2936
+ Before scatter on rank 2: tensor([0., 0., 0., 0., 0.], device='cuda:2')
2937
+
2938
+ After scatter on rank 0: tensor([1., 1., 1., 1., 1.], device='cuda:0')
2939
+ After scatter on rank 1: tensor([2., 2., 2., 2., 2.], device='cuda:1')
2940
+ After scatter on rank 2: tensor([3., 3., 3., 3., 3.], device='cuda:2')
2941
+ </d-code>
2942
+
2943
+ <p>Let’s create more interesting data to demonstrate the ReduceScatter logic: on each node we create a list of 2-elements vector on each node with a power exponent and an offset function of the node rank (it’s a bit hard to imagine so just look below for an example): </p>
2944
+
2945
+ <d-code block language="python">
2946
+ def example_reduce_scatter():
2947
+ rank = dist.get_rank()
2948
+ world_size = dist.get_world_size()
2949
+ input_tensor = [
2950
+ torch.tensor([(rank + 1) * i for i in range(1, 3)], dtype=torch.float32).cuda()**(j+1)
2951
+ for j in range(world_size)
2952
+ ]
2953
+ output_tensor = torch.zeros(2, dtype=torch.float32).cuda()
2954
+ print(f"Before ReduceScatter on rank {rank}: {input_tensor}")
2955
+ dist.reduce_scatter(output_tensor, input_tensor, op=dist.ReduceOp.SUM)
2956
+ print(f"After ReduceScatter on rank {rank}: {output_tensor}")
2957
+
2958
+ init_process()
2959
+ example_reduce_scatter()
2960
+ </d-code>
2961
+
2962
+ <p>Let’s print the pattern of data that we created. We also immediately see the ReduceScatter pattern: the first rank received the sum of the first tensor from each node, and the second rank contains the sum of the second tensor on each node and so on:</p>
2963
+
2964
+ <d-code block language="python">
2965
+ Before ReduceScatter on rank 0: [tensor([1., 2.], device='cuda:0'),
2966
+ tensor([1., 4.], device='cuda:0'),
2967
+ tensor([1., 8.], device='cuda:0')]
2968
+ Before ReduceScatter on rank 1: [tensor([2., 4.], device='cuda:1'),
2969
+ tensor([ 4., 16.], device='cuda:1'),
2970
+ tensor([ 8., 64.], device='cuda:1')]
2971
+ Before ReduceScatter on rank 2: [tensor([3., 6.], device='cuda:2'),
2972
+ tensor([ 9., 36.], device='cuda:2'),
2973
+ tensor([ 27., 216.], device='cuda:2')]
2974
+
2975
+ After ReduceScatter on rank 0: tensor([ 6., 12.], device='cuda:0')
2976
+ After ReduceScatter on rank 1: tensor([14., 56.], device='cuda:1')
2977
+ After ReduceScatter on rank 2: tensor([ 36., 288.], device='cuda:2')
2978
+ </d-code>
2979
+
2980
+
2981
+ <p>Let's have a quick look at a common implementation of AllReduce that uses ReduceScatter and AllGather: Ring AllReduce.</p>
2982
+
2983
+ <h4>A quick focus on Ring AllReduce</h4>
2984
+
2985
+ <p><strong><em>Ring AllReduce</em></strong> is one specific implementation of AllReduce, optimized for scalability. Rather than all devices communicating with each other directly, which could create communication bottlenecks, Ring All-Reduce can be broken down into two key steps: ReduceScatter and AllGather. Here's how it works:</p>
2986
+
2987
+ <ol>
2988
+ <li><strong>ReduceScatter</strong></li>
2989
+ <ul>
2990
+ <li>Each device splits its data (e.g., gradients) into chunks and sends one chunk to its neighbour. Simultaneously, each device receives a chunk from its other neighbour.</li>
2991
+ <li>As each device receives a chunk, it adds (reduces) its corresponding chunk to the received one.</li>
2992
+ <li>This process continues around the ring until each device holds a partially reduced chunk, representing a sum of the gradients across all devices for that chunk.</li>
2993
+ </ul>
2994
+ <li><strong>AllGather</strong></li>
2995
+ <ul>
2996
+ <li>Now, each device needs to collect the fully reduced chunks from other devices.</li>
2997
+ <li>The devices start sending their reduced chunks to neighbours.</li>
2998
+ <li>Each device forwards the chunks it receives until every device has all the fully reduced chunks, giving each device the complete, summed-up gradient.</li>
2999
+ </ul>
3000
+ </ol>
3001
+
3002
+ <p>Let’s illustrate this with the following gifs, where we have 5 GPUs, each with a tensor of length 5. The first animation shows the ReduceScatter step, where, at the end, each GPU receives the reduced results for a specific chunk of data (orange rectangle).</p>
3003
+
3004
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_reduce_scatter.gif" style="width: 400px" /></p>
3005
+
3006
+ <p>The next animation shows the AllGather step, where, at the end, each GPU obtains the full results of the AllReduce operation:</p>
3007
+
3008
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_all_gather.gif" style="width: 400px" /></p>
3009
+
3010
+ <p>You may have noticed that each of the <d-math>N</d-math> GPUs sends and receives values <d-math>N-1</d-math> times during both the reduce-scatter and all-gather steps. Each GPU sends <d-math>\frac{K}{N}</d-math> values per transfer, where <d-math>K</d-math> is the total number of values in the array being summed across the GPUs. Therefore, the total amount of data transferred to and from each GPU is <d-math>2 \times (N-1) \times \frac{K}{N}</d-math>. When <d-math>N</d-math> (the number of GPUs) is large, the total amount of data transferred to and from each GPU is approximately <d-math>2 \times K</d-math>, where <d-math>K</d-math> is the total number of parameters.</p>
3011
+
3012
+
3013
+ <p><strong>There are two key things to keep in mind for AllReduce:</strong></p>
3014
+ <ol>
3015
+ <li>The communication cost for AllReduce is approximately <d-math>2xK</d-math> when <d-math>N</d-math> (the number of GPUs) is large.</li>
3016
+ <li>An AllReduce operation can be broken down into a reduce-scatter followed by an all-gather. The communication cost for these two operations is half that of the AllReduce, which is approximately <d-math>K</d-math>.</li>
3017
+ </ol>
3018
+
3019
+ <p>As we can see this implementation can make efficient use of even a limited bandwidth between nodes.</p>
3020
+
3021
+ <p>We now have seen the main building block of distributed operations but before we see them in action let’s have a look at a special operation used for synchronization: the Barrier.</p>
3022
+
3023
  <h4>Barrier</h4>
3024
 
3025
+ <p>The Barrier is a simple operation to synchronize all nodes. A barrier is not lifted until all nodes have reached it. Then only are they allowed to continue with further computations:</p>
3026
+
3027
+ <p style="text-align: center"><img alt="image.png" src="/assets/images/a0_barrier.png" style="width: 400px" /></p>
3028
+
3029
+ <p>We can easily simulate delayed nodes by setting up a different sleep time on each node and see how long it takes for all of them to pass the barrier:</p>
3030
+
3031
+ <d-code block language="python">
3032
+ def example_barrier():
3033
+ rank = dist.get_rank()
3034
+ t_start = time.time()
3035
+ print(f"Rank {rank} sleeps {rank} seconds.")
3036
+ time.sleep(rank) # Simulate different processing times
3037
+ dist.barrier()
3038
+ print(f"Rank {rank} after barrier time delta: {time.time()-t_start:.4f}")
3039
+
3040
+ init_process()
3041
+ example_barrier()
3042
+ </d-code>
3043
+
3044
+ <p>We can see that although the first rank didn’t sleep at all it also took it 2sec to pass the barrier:</p>
3045
+
3046
+ <d-code block language="python">
3047
+ Rank 0 sleeps 0 seconds.
3048
+ Rank 1 sleeps 1 seconds.
3049
+ Rank 2 sleeps 2 seconds.
3050
+
3051
+ Rank 0 after barrier time delta: 2.0025
3052
+ Rank 1 after barrier time delta: 2.0025
3053
+ Rank 2 after barrier time delta: 2.0024
3054
+ </d-code>
3055
+
3056
+ <p>We need to be careful with synchronizing all nodes like this, as this defeat the purpose of parallel independent operations and might thus slow down the whole processing. In many situations it can be just fine if a fast node already starts processing the next job as the fast node could be slower in a next iteration therefore evening out the delay over the whole process.</p>
3057
+
3058
+ <p>Before turning to practical distributed training implementations, let’s first solve a mystery: what the heck is NCCL?</p>
3059
+
3060
  <h4>NCCL: NVIDIA Collective Communications Library</h4>
3061
+
3062
+ <p>When training large models on many GPUs we may sometimes strike gold but we will always encounter nickel (or NCCL 🥁)! What’s is that?</p>
3063
+
3064
+ <p>There are several libraries that implement collective communication and are support by PyTorch: there’s the classic <strong><em>MPI</em></strong> (Message Passing Interface), there’s <strong><em>Gloo</em></strong> by Meta, and finally there is `NCCL` (NVIDIA Collective Communications Library). They all provide similar functionality in terms of collective communication patterns but are optimized for different hardware setups; NCCL is designed to serve GPU-GPU communication efficiently while MPI and Gloo are setup for CPU-CPU or CPU-GPU communication. PyTorch provides a <a href="https://pytorch.org/docs/stable/distributed.html#which-backend-to-use">great guide</a> to decide which one to use:</p>
3065
+
3066
+ <ul>
3067
+ <li>GPU training: use NCCL</li>
3068
+ <li>CPU training: use Gloo</li>
3069
+ </ul>
3070
+
3071
+ <p>There are a few finer points in the decision tree that we leave to the reader to explore in the PyTorch guide referenced above.</p>
3072
+
3073
+ <p>Now that we covered the fundamental operations for distributed training and when you should should be ready to follow the blog post easily.</p>
3074
 
3075
  <h3>A1: Distributed Training Profiling</h3>
3076