Pixiv - おむたつ/omutatsu
分布式 PyTorch 初探
498 字
2 分钟
分布式 PyTorch 初探
Important
相关源代码请查看 [github]
Import 相关模块
import osimport timeimport torchimport torch.distributed as dist环境初始化
# 1. Read rank/world_size from environment (set by torchrun/mpirun)rank = int(os.environ.get("RANK", 0))world_size = int(os.environ.get("WORLD_SIZE", 1))local_rank = int(os.environ.get("LOCAL_RANK", 0))
# 2. Device setupdevice = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")if torch.cuda.is_available(): torch.cuda.set_device(device)
# 3. Initialize process groupbackend = "nccl" if torch.cuda.is_available() else "gloo"dist.init_process_group(backend=backend, rank=rank, world_size=world_size)集合通信操作
AllReduce
### All-reducedist.barrier() # Waits for all processes to get to this point (in this case, for print statements)
data = torch.tensor([0., 1, 2, 3], device=device) + rank # Both input and output
print(f"Rank {rank} [before all-reduce]: {data}", flush=True)dist.all_reduce(tensor=data, op=dist.ReduceOp.SUM, async_op=False) # Modifies tensor in placeprint(f"Rank {rank} [after all-reduce]: {data}", flush=True)ReduceScatter
### Reduce-scatterdist.barrier()
input_ts = torch.arange(world_size, dtype=torch.float32, device=device) + rank # Input# output_ts = torch.empty(1, device=device) # Allocate outputoutput_ts = torch.zeros(1, device=device) # Allocate output
print(f"Rank {rank} [before reduce-scatter]: input = {input_ts}, output = {output_ts}", flush=True)dist.reduce_scatter_tensor(output=output_ts, input=input_ts, op=dist.ReduceOp.SUM, async_op=False)print(f"Rank {rank} [after reduce-scatter]: input = {input_ts}, output = {output_ts}", flush=True)AllGather
### All-gatherdist.barrier()
input_ts = output_ts # Input is the output of reduce-scatter# output_ts = torch.empty(world_size, device=device) # Allocate outputoutput_ts = torch.zeros(world_size, device=device) # Allocate output
print(f"Rank {rank} [before all-gather]: input = {input_ts}, output = {output_ts}", flush=True)dist.all_gather_into_tensor(output_tensor=output_ts, input_tensor=input_ts, async_op=False)print(f"Rank {rank} [after all-gather]: input = {input_ts}, output = {output_ts}", flush=True)运行结果
在一个包含 4 个 GPU 的节点上运行的结果如下:

在两个节点,每个节点包含 4 个 GPU,上运行的结果如下:

带宽基准测试
创建 Tensor
# Create tensordata = torch.randn(num_elements, device=cuda_if_available(rank))Warmup
# Warmupdist.all_reduce(tensor=data, op=dist.ReduceOp.SUM, async_op=False)torch.cuda.synchronize() # Wait for CUDA kernels to finishdist.barrier() # Wait for all the processes to get here记录时间
# Perform all-reducestart_time = time.time()dist.all_reduce(tensor=data, op=dist.ReduceOp.SUM, async_op=False)torch.cuda.synchronize() # Wait for CUDA kernels to finishdist.barrier() # Wait for all the processes to get hereend_time = time.time()
def render_duration(duration: float) -> str: if duration < 1e-3: return f"{duration * 1e6:.2f}us" if duration < 1: return f"{duration * 1e3:.2f}ms" return f"{duration:.2f}s"
duration = end_time - start_timeprint(f"[all_reduce] Rank {rank}: all_reduce(world_size={world_size}, num_elements={num_elements}) took {render_duration(duration)}", flush=True)计算带宽
# Measure the effective bandwidthdist.barrier()size_bytes = data.element_size() * data.numel()sent_bytes = size_bytes * 2 * (world_size - 1) # 2x because send + receive, world_size-1 steps in all-reducetotal_duration = world_size * durationbandwidth = sent_bytes / total_durationprint(f"[all_reduce] Rank {rank}: all_reduce measured bandwidth = {round(bandwidth / 1024**3)} GB/s", flush=True)运行结果
在一个搭载 4 台 GPU 的节点上运行结果如下:

值得注意的是理论最大带宽是 200 GB/s,nvidia-smi nvlink -s 的输出如下:

释放资源
# Cleanupdist.barrier()torch.distributed.destroy_process_group()支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!
分布式 PyTorch 初探
https://llm-tech.com.cn/posts/pytorch-distributed/