Spaces:
Runtime error
Runtime error
File size: 14,760 Bytes
3d37122 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
import gradio as gr
import matplotlib.pyplot as plt
import numpy as np
from descriptions import basic_texts, descriptions
def generate_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size):
"""
Generate data parallel groups based on the provided parallelism parameters.
"""
assert world_size % (pipeline_model_parallel_size * tensor_model_parallel_size * context_parallel_size) == 0, "world_size must be divisible by the product of pipeline_model_parallel_size, tensor_model_parallel_size, and context_parallel_size"
data_parallel_group_ranks = []
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
for i in range(pipeline_model_parallel_size):
start_rank = i * num_pipeline_model_parallel_groups
end_rank = (i + 1) * num_pipeline_model_parallel_groups
for j in range(context_parallel_size * tensor_model_parallel_size):
ranks = range(
start_rank + j, end_rank, context_parallel_size * tensor_model_parallel_size
)
data_parallel_group_ranks.append(list(ranks))
return data_parallel_group_ranks
def generate_context_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size):
"""
Generate data parallel groups considering context parallelism.
"""
assert world_size % (pipeline_model_parallel_size * tensor_model_parallel_size * context_parallel_size) == 0, "world_size must be divisible by the product of pipeline_model_parallel_size, tensor_model_parallel_size, and context_parallel_size"
all_data_parallel_group_ranks_with_cp = []
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
for i in range(pipeline_model_parallel_size):
start_rank = i * num_pipeline_model_parallel_groups
end_rank = (i + 1) * num_pipeline_model_parallel_groups
for j in range(tensor_model_parallel_size):
ranks_with_cp = range(start_rank + j, end_rank, tensor_model_parallel_size)
all_data_parallel_group_ranks_with_cp.append(list(ranks_with_cp))
return all_data_parallel_group_ranks_with_cp
def generate_context_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size):
"""
Generate data parallel groups considering context parallelism.
"""
assert world_size % (pipeline_model_parallel_size * tensor_model_parallel_size * context_parallel_size) == 0, "world_size must be divisible by the product of pipeline_model_parallel_size, tensor_model_parallel_size, and context_parallel_size"
all_data_parallel_group_ranks_with_cp = []
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
for i in range(pipeline_model_parallel_size):
start_rank = i * num_pipeline_model_parallel_groups
end_rank = (i + 1) * num_pipeline_model_parallel_groups
for j in range(tensor_model_parallel_size):
ranks_with_cp = range(start_rank + j, end_rank, tensor_model_parallel_size)
all_data_parallel_group_ranks_with_cp.append(list(ranks_with_cp))
return all_data_parallel_group_ranks_with_cp
def generate_tensor_model_parallel_groups(world_size, tensor_model_parallel_size):
"""
Generate model parallel groups based on tensor model parallel size.
"""
assert world_size % tensor_model_parallel_size == 0, "world_size must be divisible by tensor_model_parallel_size"
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size
tensor_model_parallel_group_ranks = []
for i in range(num_tensor_model_parallel_groups):
ranks = range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size)
tensor_model_parallel_group_ranks.append(list(ranks))
return tensor_model_parallel_group_ranks
def generate_pipeline_parallel_groups(world_size, pipeline_model_parallel_size):
"""
Generate pipeline parallel groups based on pipeline model parallel size.
"""
assert world_size % pipeline_model_parallel_size == 0, "world_size must be divisible by pipeline_model_parallel_size"
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
pipline_parallel_group_ranks = []
for i in range(num_pipeline_model_parallel_groups):
ranks = range(i, world_size, num_pipeline_model_parallel_groups)
pipline_parallel_group_ranks.append(list(ranks))
return pipline_parallel_group_ranks
def generate_context_parallel_groups(world_size, context_parallel_size, tensor_model_parallel_size, pipeline_model_parallel_size):
"""
Generate context parallel groups based on context parallel size, considering tensor and pipeline model parallel sizes.
"""
assert world_size % (context_parallel_size * tensor_model_parallel_size * pipeline_model_parallel_size) == 0, "world_size must be divisible by the product of context_parallel_size, tensor_model_parallel_size, and pipeline_model_parallel_size"
data_parallel_size = world_size // (tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size)
context_parallel_group_ranks = []
num_pipeline_model_parallel_groups: int = world_size // pipeline_model_parallel_size
for i in range(pipeline_model_parallel_size):
for j in range(data_parallel_size):
start_rank = (
i * num_pipeline_model_parallel_groups
+ j * tensor_model_parallel_size * context_parallel_size
)
end_rank = (
i * num_pipeline_model_parallel_groups
+ (j + 1) * tensor_model_parallel_size * context_parallel_size
)
for k in range(tensor_model_parallel_size):
ranks = range(start_rank + k, end_rank, tensor_model_parallel_size)
context_parallel_group_ranks.append(list(ranks))
return context_parallel_group_ranks
def plot_parallel_groups(title="Parallel Groups", dp_groups=None, tp_groups=None, pp_groups=None, cp_groups=None):
# Initialize a figure
fig, ax = plt.subplots(figsize=(8, 6))
# Define the spacing between blocks and their size
block_size = 700 # Size of the blocks in the scatter plot
spacing = 1.5 # Spacing multiplier between blocks
if cp_groups is None:
cp_offset_x = 0
cp_offset_y = 0
tp_offset_x = 0.2
tp_offset_y = -0.2
if tp_groups:
pp_offset_x = 0.4
pp_offset_y = -0.4
else:
pp_offset_x = 0.2
pp_offset_y = -0.2
else:
cp_offset_x = 0.2
cp_offset_y = -0.2
tp_offset_x = 0.4
tp_offset_y = -0.4
if tp_groups:
pp_offset_x = 0.6
pp_offset_y = -0.6
else:
pp_offset_x = 0.4
pp_offset_y = -0.4
# Adjust the grid layout to map GPU ranks from top-left to bottom-right
num_cols = 4 # Number of columns in the grid
x_positions = np.tile(np.arange(num_cols), num_cols) * spacing
y_positions = np.repeat(np.arange(num_cols), num_cols)[::-1] * spacing # Reverse to start from top
dp_colors = plt.cm.tab20(np.linspace(0, 1, len(dp_groups)))
# 使用tab20b提高颜色区分度
if tp_groups is not None:
tp_colors = plt.cm.tab20b(np.linspace(0, 1, len(tp_groups)))
# 如果需要更多颜色,可以考虑结合使用tab20b和tab20c
if pp_groups is not None:
pp_colors = plt.cm.tab20c(np.linspace(0, 1, len(pp_groups)))
if cp_groups is not None:
cp_colors = plt.cm.tab20c(np.linspace(0, 1, len(cp_groups)))
if cp_groups is not None:
for group_idx, group in enumerate(cp_groups):
for rank in group:
x = x_positions[rank % (num_cols*num_cols)] + cp_offset_x
y = y_positions[rank % (num_cols*num_cols)] + cp_offset_y
ax.scatter(x, y, s=block_size, color=cp_colors[group_idx], edgecolor='black', zorder=5, marker='s')
ax.text(x, y, f'CP{rank}', ha='center', va='center', color='white', fontsize=8, zorder=6, fontweight='bold')
for group_idx, group in enumerate(dp_groups):
for rank in group:
x = x_positions[rank % (num_cols*num_cols)]
y = y_positions[rank % (num_cols*num_cols)]
ax.scatter(x, y, s=block_size, color=dp_colors[group_idx], edgecolor='black', zorder=5, marker='>')
ax.text(x, y, f'DP{rank}', ha='center', va='center', color='white', fontsize=8, zorder=6, fontweight='bold')
if tp_groups is not None:
for group_idx, group in enumerate(tp_groups):
for rank in group:
x = x_positions[rank % (num_cols*num_cols)] + tp_offset_x
y = y_positions[rank % (num_cols*num_cols)] + tp_offset_y
ax.scatter(x, y, s=block_size, color=tp_colors[group_idx], edgecolor='black', zorder=5, marker='p')
ax.text(x, y, f'TP{rank}', ha='center', va='center', color='white', fontsize=8, zorder=6, fontweight='bold')
if pp_groups is not None:
for group_idx, group in enumerate(pp_groups):
for rank in group:
x = x_positions[rank % (num_cols*num_cols)] + pp_offset_x
y = y_positions[rank % (num_cols*num_cols)] + pp_offset_y
ax.scatter(x, y, s=block_size, color=pp_colors[group_idx], edgecolor='black', zorder=5, marker='h')
ax.text(x, y, f'PP{rank}', ha='center', va='center', color='white', fontsize=8, zorder=6, fontweight='bold')
# Draw a separating line between Node0 and Node1
mid_y_position = np.max(y_positions) / 2
ax.axhline(y=mid_y_position, color='black', linestyle='-', linewidth=2, zorder=0)
# Add Node labels
ax.text(-spacing, max(y_positions)/4, 'Node1', verticalalignment='center', fontsize=12)
ax.text(-spacing, 3*max(y_positions)/4, 'Node0', verticalalignment='center', fontsize=12)
# Adjusting the appearance
ax.set_aspect('equal') # Keep the aspect ratio square
ax.axis('off') # Turn off the axis
plt.title(title, pad=30)
return fig
# Gradio interface setup
def create_interface():
def update_plot(parallel_group_type, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size, unused_text):
world_size = 16 # Fixed world size for 2 machines with 8 GPUs each
description = descriptions.get(parallel_group_type, "Invalid parallel group type")
# Initialize groups to None
data_groups = tp_groups = pp_groups = cp_groups = None
if "CP" in parallel_group_type or parallel_group_type == 'Context Parallel':
cp_groups = generate_context_parallel_groups(world_size, context_parallel_size, tensor_model_parallel_size, pipeline_model_parallel_size)
if "DP" in parallel_group_type:
data_groups = generate_context_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size)
else:
if "DP" in parallel_group_type or parallel_group_type == 'Data Parallel':
data_groups = generate_data_parallel_groups(world_size, tensor_model_parallel_size, pipeline_model_parallel_size, context_parallel_size)
if parallel_group_type in ['Tensor Model Parallel', 'DP+TP', 'DP+TP+PP', 'CP+DP+TP', 'CP+DP+TP+PP']:
tp_groups = generate_tensor_model_parallel_groups(world_size, tensor_model_parallel_size)
if parallel_group_type in ['Pipeline Parallel', 'DP+PP', 'DP+TP+PP', 'CP+DP+PP', 'CP+DP+TP+PP']:
pp_groups = generate_pipeline_parallel_groups(world_size, pipeline_model_parallel_size)
# Prepare text description for display
groups_list_str = ""
if data_groups:
groups_list_str += "Data Parallel Groups:\n"
groups_list_str += "\n".join([f"Data Group {idx + 1}: {group}" for idx, group in enumerate(data_groups)])
groups_list_str += "\n--------------------------------------\n"
if tp_groups:
groups_list_str += "Tensor Model Parallel Groups:\n"
groups_list_str += "\n".join([f"Tensor Group {idx + 1}: {group}" for idx, group in enumerate(tp_groups)])
groups_list_str += "\n--------------------------------------\n"
if pp_groups:
groups_list_str += "Pipeline Model Parallel Groups:\n"
groups_list_str += "\n".join([f"Pipeline Group {idx + 1}: {group}" for idx, group in enumerate(pp_groups)])
groups_list_str += "\n--------------------------------------\n"
if cp_groups:
groups_list_str += "Context Parallel Groups:\n"
groups_list_str += "\n".join([f"Context Group {idx + 1}: {group}" for idx, group in enumerate(cp_groups)])
groups_list_str += "\n--------------------------------------\n"
text_to_display = f"==========Parallel Groups Display==========\n\n{groups_list_str}\n\n{description}"
# Generate the figure with the parallel groups
fig = plot_parallel_groups(f"{parallel_group_type} Groups", data_groups if data_groups else [], tp_groups=tp_groups, pp_groups=pp_groups, cp_groups=cp_groups)
return fig, text_to_display
iface = gr.Interface(
fn=update_plot,
inputs=[
gr.Dropdown(['Data Parallel', 'Tensor Model Parallel', 'Pipeline Parallel', 'Context Parallel',
'DP+TP', 'DP+PP', 'DP+TP+PP',
'CP+DP', 'CP+DP+TP', 'CP+DP+PP', 'CP+DP+TP+PP'], label="Parallel Group Type"),
gr.Slider(1, 8, step=1, label="Tensor Model Parallel Size"),
gr.Slider(1, 8, step=1, label="Pipeline Model Parallel Size"),
gr.Slider(1, 8, step=1, label="Context Parallel Size"),
gr.Textbox(basic_texts, interactive=False)
],
outputs=[
"plot",
"text"
],
title="Megatron-LM Parallel Group Visualization",
description="Select parallel sizes and types to visualize different parallel groups with distinct colors. This includes combinations of Data Parallel (DP), Tensor Model Parallel (TP), Pipeline Parallel (PP), and Context Parallel (CP). Note that the size of data parallelism is automatically calculated based on world_size (which is stable at 16 here) as well as tensor_model_parallel_size, pipeline_model_parallel_size, and context_parallel_size.",
live=True
)
return iface
# Create and launch the interface
iface = create_interface()
iface.launch(share=True)
|