分布式 PyTorch 初探

498 字
2 分钟
分布式 PyTorch 初探
Important

相关源代码请查看 [github]

Import 相关模块#

import os
import time
import torch
import torch.distributed as dist

环境初始化#

# 1. Read rank/world_size from environment (set by torchrun/mpirun)
rank = int(os.environ.get("RANK", 0))
world_size = int(os.environ.get("WORLD_SIZE", 1))
local_rank = int(os.environ.get("LOCAL_RANK", 0))
# 2. Device setup
device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu")
if torch.cuda.is_available():
torch.cuda.set_device(device)
# 3. Initialize process group
backend = "nccl" if torch.cuda.is_available() else "gloo"
dist.init_process_group(backend=backend, rank=rank, world_size=world_size)

集合通信操作#

AllReduce#

### All-reduce
dist.barrier() # Waits for all processes to get to this point (in this case, for print statements)
data = torch.tensor([0., 1, 2, 3], device=device) + rank # Both input and output
print(f"Rank {rank} [before all-reduce]: {data}", flush=True)
dist.all_reduce(tensor=data, op=dist.ReduceOp.SUM, async_op=False) # Modifies tensor in place
print(f"Rank {rank} [after all-reduce]: {data}", flush=True)

ReduceScatter#

### Reduce-scatter
dist.barrier()
input_ts = torch.arange(world_size, dtype=torch.float32, device=device) + rank # Input
# output_ts = torch.empty(1, device=device) # Allocate output
output_ts = torch.zeros(1, device=device) # Allocate output
print(f"Rank {rank} [before reduce-scatter]: input = {input_ts}, output = {output_ts}", flush=True)
dist.reduce_scatter_tensor(output=output_ts, input=input_ts, op=dist.ReduceOp.SUM, async_op=False)
print(f"Rank {rank} [after reduce-scatter]: input = {input_ts}, output = {output_ts}", flush=True)

AllGather#

### All-gather
dist.barrier()
input_ts = output_ts # Input is the output of reduce-scatter
# output_ts = torch.empty(world_size, device=device) # Allocate output
output_ts = torch.zeros(world_size, device=device) # Allocate output
print(f"Rank {rank} [before all-gather]: input = {input_ts}, output = {output_ts}", flush=True)
dist.all_gather_into_tensor(output_tensor=output_ts, input_tensor=input_ts, async_op=False)
print(f"Rank {rank} [after all-gather]: input = {input_ts}, output = {output_ts}", flush=True)

运行结果#

在一个包含 4 个 GPU 的节点上运行的结果如下:

在两个节点,每个节点包含 4 个 GPU,上运行的结果如下:

带宽基准测试#

创建 Tensor#

# Create tensor
data = torch.randn(num_elements, device=cuda_if_available(rank))

Warmup#

# Warmup
dist.all_reduce(tensor=data, op=dist.ReduceOp.SUM, async_op=False)
torch.cuda.synchronize() # Wait for CUDA kernels to finish
dist.barrier() # Wait for all the processes to get here

记录时间#

# Perform all-reduce
start_time = time.time()
dist.all_reduce(tensor=data, op=dist.ReduceOp.SUM, async_op=False)
torch.cuda.synchronize() # Wait for CUDA kernels to finish
dist.barrier() # Wait for all the processes to get here
end_time = time.time()
def render_duration(duration: float) -> str:
if duration < 1e-3:
return f"{duration * 1e6:.2f}us"
if duration < 1:
return f"{duration * 1e3:.2f}ms"
return f"{duration:.2f}s"
duration = end_time - start_time
print(f"[all_reduce] Rank {rank}: all_reduce(world_size={world_size}, num_elements={num_elements}) took {render_duration(duration)}", flush=True)

计算带宽#

# Measure the effective bandwidth
dist.barrier()
size_bytes = data.element_size() * data.numel()
sent_bytes = size_bytes * 2 * (world_size - 1) # 2x because send + receive, world_size-1 steps in all-reduce
total_duration = world_size * duration
bandwidth = sent_bytes / total_duration
print(f"[all_reduce] Rank {rank}: all_reduce measured bandwidth = {round(bandwidth / 1024**3)} GB/s", flush=True)

运行结果#

在一个搭载 4 台 GPU 的节点上运行结果如下:

值得注意的是理论最大带宽是 200 GB/s,nvidia-smi nvlink -s 的输出如下:

释放资源#

# Cleanup
dist.barrier()
torch.distributed.destroy_process_group()

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

赞助
分布式 PyTorch 初探
https://llm-tech.com.cn/posts/pytorch-distributed/
作者
Ming
发布于
2026-05-08
许可协议
CC BY-NC-SA 4.0
Profile Image of the Author
Ming
你是来找 Ming 学习的吗
🎉 欢迎来到 Ming 的博客
这里是我的个人博客,分享 AI Infra、LLM 等技术内容。欢迎关注交流!
分类
标签
站点统计
文章
19
分类
6
标签
12
总字数
69,591
运行时长
0
最后活动
0 天前

目录