Pytorch 分布式训练
date
May 27, 2022
Last edited time
Mar 27, 2023 08:52 AM
status
Published
slug
Pytorch分布式训练
tags
DL
summary
type
Post
Field
Plat
概述DP使用原理特点DDP使用原理特点Pytorch 分布式使用流程基本概念基本使用流程使用模板TCP 初始化方式Env 初始化方式函数说明通信后端概述如何选择DistributedDataParallel原型功能注意
概述
DP
使用
DP 使用起来非常方便,只需要将原来单卡的 module 用 DP 改成多卡:
原理
DP 基于单机多卡,所有设备都负责计算和训练网络,除此之外, device[0] (并非 GPU 真实标号而是输入参数 device_ids 首位) 还要负责整合梯度,更新参数。图 1 即为 GPU 0 作为 device[0] 的例子。从图中我们可以看出,有三个主要过程:
- 过程一(图中红色部分):各卡分别计算损失和梯度
- 过程二(图中蓝色部分):所有梯度整合到 device[0]
- 过程三(图中绿色部分):device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新
所有卡都并行运算(图中红色),将梯度收集到 device[0](图中浅蓝色)和 device[0] 分享模型参数给其他 GPU(图中绿色)三个主要过程。

特点
- 负载不均衡
- 通信开销大
- 单进程,多线程。存在GIL锁
- 无法使用多机训练
DDP
使用
原理
DDP 也是数据并行,所以每张卡都有模型和输入。我们以多进程多线程为例,每起一个进程,该进程的 device[0] 都会从本地复制模型,如果该进程仍有多线程,就像 DP,模型会从 device[0] 复制到其他设备。
DDP 通过 Reducer 来管理梯度同步。为了提高通讯效率, Reducer 会将梯度归到不同的桶里(按照模型参数的 reverse order, 因为反向传播需要符合这样的顺序),一次归约一个桶。其中桶的大小为参数 bucket_cap_mb 默认为 25,可根据需要调整。下图即为一个例子。
可以看到每个进程里,模型参数都按照倒序放在桶里,每次归约一个桶。

DDP 通过在构建时注册 autograd hook 进行梯度同步。反向传播时,当一个梯度计算好后,相应的 hook 会告诉 DDP 可以用来归约。当一个桶里的梯度都可以了,Reducer 就会启动异步 allreduce 去计算所有进程的平均值。allreduce 异步启动使得 DDP 可以边计算边通信,提高效率。当所有桶都可以了,Reducer 会等所有 allreduce 完成,然后将得到的梯度写到 param.grad。
特点
- 多进程
和 DP 不同, DDP 采用多进程,最推荐的做法是每张卡一个进程从而避免单进程带来的影响。 DDP 同样支持单进程多线程多卡操作,自然也支持多进程多线程,不过需要注意一下 world_size。
- 通信效率
DP 的通信成本随着 GPU 数量线性增长,而 DDP 支持 Ring AllReduce,其通信成本是恒定的,与 GPU 数量无关。
- 同步参数
DP 通过收集梯度到 device[0],在device[0] 更新参数,然后其他设备复制 device[0] 的参数实现各个模型同步;
DDP 通过保证初始状态相同并且改变量也相同(指同步梯度) ,保证模型同步。
Pytorch 分布式使用流程
基本概念
下面是分布式系统中常用的一些概念:
group:
即进程组。默认情况下,只有一个组,一个
job 即为一个组,也即一个 world。当需要进行更加精细的通信时,可以通过
new_group 接口,使用 word 的子集,创建新组,用于集体通信等。world size:
表示全局进程个数。
rank:
表示进程序号,用于进程间通讯,表征进程优先级。
rank = 0 的主机为 master 节点。local_rank:
进程内,
GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。基本使用流程
Pytorch 中分布式的基本使用流程如下:- 在使用
distributed包的任何其他函数之前,需要使用init_process_group初始化进程组,同时初始化distributed包。
- 如果需要进行小组内集体通信,用
new_group创建子分组
- 创建分布式并行模型
DDP(model, device_ids=device_ids)
- 为数据集创建
Sampler
- 使用启动工具
torch.distributed.launch在每个主机上执行一次脚本,开始训练
- 使用
destory_process_group()销毁进程组
使用模板
下面以
TCP 初始化方式为例,共 3 太主机,每台主机 2 块 GPU,进行分布式训练。TCP 初始化方式
执行方式
说明
- 在
TCP方式中,在init_process_group中必须手动指定以下参数
rank为当前进程的进程号
word_size为当前job的总进程数
init_method内指定tcp模式,且所有进程的ip:port必须一致,设定为主进程的ip:port
- 必须在
rank==0的进程内保存参数。
- 若程序内未根据
rank设定当前进程使用的GPUs,则默认使用全部GPU,且以数据并行的方式使用。
- 每条命令表示一个进程。若已开启的进程未达到
word_size的数量,则所有进程会一直等待
- 每台主机上可以开启多个进程。但是,若未为每个进程分配合适的
GPU,则同机不同进程可能会共用GPU,应该坚决避免这种情况。
- 使用
gloo后端进行GPU训练时,会报错。
- 若每个进程负责多块
GPU,可以利用多GPU进行模型并行。如下所示:
Env 初始化方式
代码
执行方式
说明
- 在
Env方式中,在init_process_group中,无需指定任何参数
- 必须在
rank==0的进程内保存参数。
- 该方式下,使用
torch.distributed.launch在每台主机上,为其创建多进程,其中:
nproc_per_node参数指定为当前主机创建的进程数。一般设定为当前主机的GPU数量
nnodes参数指定当前job包含多少个节点
node_rank指定当前节点的优先级
master_addr和master_port分别指定master节点的ip:port
- 若没有为每个进程合理分配
GPU,则默认使用当前主机上所有的GPU。即使一台主机上有多个进程,也会共用GPU。
- 使用
torch.distributed.launch工具时,将会为当前主机创建nproc_per_node个进程,每个进程独立执行训练脚本。同时,它还会为每个进程分配一个local_rank参数,表示当前进程在当前主机上的编号。例如:rank=2, local_rank=0表示第3个节点上的第1个进程。
- 需要合理利用
local_rank参数,来合理分配本地的GPU资源
- 每条命令表示一个进程。若已开启的进程未达到
word_size的数量,则所有进程会一直等待
函数说明
初始化进程组
- 函数声明
- 函数作用
该函数需要在每个进程中进行调用,用于初始化该进程。在使用分布式时,该函数必须在
distributed 内所有相关函数之前使用。- 参数详解
- backend :指定当前进程要使用的通信后端
小写字符串,支持的通信后端有
gloo,mpi,nccl。建议用nccl。 - init_method : 指定当前进程组初始化方式
可选参数,字符串形式。如果未指定
init_method及store,则默认为env://,表示使用读取环境变量的方式进行初始化。该参数与store互斥。 - rank : 指定当前进程的优先级
int值。表示当前进程的编号,即优先级。如果指定store参数,则必须指定该参数。rank=0的为主进程,即master节点。 - world_size :
该
job中的总进程数。如果指定store参数,则需要指定该参数。 - timeout : 指定每个进程的超时时间
可选参数,
datetime.timedelta对象,默认为30分钟。该参数仅用于Gloo后端。 - store
所有
worker可访问的key/value,用于交换连接 / 地址信息。与init_method互斥。
创建进程组
- 函数声明
- 函数作用
new_group() 函数可用于使用所有进程的任意子集来创建新组。其返回一个分组句柄,可作为 collectives 相关函数的 group 参数 。collectives 是分布式函数,用于特定编程模式中的信息交换。- 参数详解
- ranks:指定新分组内的成员的
ranks列表list,其中每个元素为int型 - timeout:指定该分组进程组内的操作的超时时间
可选参数,
datetime.timedelta对象,默认为30分钟。该参数仅用于Gloo后端。 - backend:指定要使用的通信后端
小写字符串,支持的通信后端有
gloo,nccl,必须与init_process_group()中一致。
获取进程组属性
- 函数声明
- 函数说明
返回给定进程组的
backend。- 参数
- group:要获取信息的进程组。
- 返回
进程组对象,默认为主进程组。如果指定另一个进程组,则调用该函数的进程必须为所指定的进程组的进程。
给定进程组的后端,以小写字符串的形式给出
获取进程组进程优先级
- 函数原型
- 函数说明
返回当前进程的
rank。rank 是赋值给一个分布式进程组组内的每个进程的唯一识别。一般而言,rank 均为从 0 到 world_size 的整数。- 参数
- group
要获取信息的进程组对象,默认为主进程组。如果指定另一个进程组,则调用该函数的进程必须为所指定的进程组的进程。
获取进程组进程数
- 函数原型
- 函数说明
返回当前进程组内的进程数。
- 参数
- group
要获取信息的进程组对象,默认为主进程组。如果指定另一个进程组,则调用该函数的进程必须为所指定的进程组的进程。
获取进程组是否初始化
- 函数原型
- 函数说明
检查默认进程组是否被初始化。
通信后端
概述
使用分布式时,在梯度汇总求平均的过程中,各主机之间需要进行通信。因此,需要指定通信的协议架构等。
torch.distributed 对其进行了封装。torch.distributed 支持 3 种后端,分别为 NCCL,Gloo,MPI。各后端对 CPU / GPU 的支持如下所示:Backen | gloo | ㅤ | mpi | ㅤ | nccl | ㅤ |
Device | CPU | GPU | CPU | GPU | CPU | GPU |
send | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
barrier | ✓ | ✘ | ✓ | ? | ✘ | ✓ |
如何选择
强烈建议:NCCL 是目前最快的后端,且对多进程分布式(Multi-Process Single-GPU)支持极好,可用于单节点以及多节点的分布式训练。 节点即主机。即使是单节点,由于底层机制不同,distributed 也比 DataParallel 方式要高效。
基本原则:
- 用
NCCL进行分布式GPU训练
- 用
Gloo进行分布式CPU训练
无限带宽互联的 GPU 集群
- 使用
NCCL,因为它是目前唯一支持InfiniBand和GPUDirect的后端
无限带宽和 GPU 直连
- 使用
NCCL,因为其目前提供最佳的分布式GPU训练性能。尤其是multiprocess single-node或multi-node distributed训练。
- 如果用
NCCL训练有问题,再考虑使用Gloo。(当前,Gloo在GPU分布式上,相较于NCCL慢)
无限带宽互联的 CPU 集群
- 如果
InfiniBand对IB启用IP,请使用Gloo,否则使使用MPI。
- 在未来将添加
infiniBand对Gloo的支持
以太网互联的 CPU 集群
- 使用
Gloo,除非有特别的原因使用MPI。
DistributedDataParallel
原型
功能
将给定的
module 进行分布式封装, 其将输入在 batch 维度上进行划分,并分配到指定的 devices 上。module 会被复制到每台机器的每个 GPU 上,每一个模型的副本处理输入的一部分。在反向传播阶段,每个机器的每个
GPU 上的梯度进行汇总并求平均。与 DataParallel 类似,batch size 应该大于 GPU 总数。注意
- 要使用该
class,需要先对torch.distributed进行初进程组始化,可以通过torch.distributed.init_process_group()实现。
- 该
module仅在gloo和nccl后端上可用。
- 根据分布式原理,
Constructor和differentiation of the output(或a function of the output of this module) 是一个分布式同步点。在不同的进程执行不同的代码时,需要考虑这一点。
- 该
module假设,所有的参数在其创建时,在模型中已经注册,之后没有新的参数加入或者参数移除。对于buffers也是一样。(这也是由分布式原理决定)
- 该
module假设,所有的参数在每个分布式进程模型中注册的顺序一致。该module自身将会按照该模型中参数注册的相反顺序执行梯度的all-reduction。换言之,用户应该保证,每个分布式进程模型一样,且参数注册顺序一致。(这也是由分布式原理决定)
- 如果计划使用该
module,并用NCCL后端或Gloo后端 (使用infiniband),需要与多workers的Dataloader一同使用,请修改多进程启动算法为forkserver(python 3 only) 或spawn。不幸的是,Gloo(使用infiniband) 和NCCL2 fork并不安全,并且如果不改变配置时,很可能会deadlocks。
- 在
module上定义的前向传播和反向传播hooks和其子modules将不会涉及,除非hooks在forward中进行了初始化。
- 在使用
DistributedDataParallel封装model后,不应该再修改模型的参数。也就是说,当使用DistributedDataParallel打包model时,DistributedDataParallel的constructor将会在模型上注册额外的归约函数,该函数作用于模型的所有参数。
如果在构建
DistributedDataParallel 之后,改变模型的参数,这是不被允许的,并且可能会导致不可预期的后果,因为部分参数的梯度归约函数可能不会被调用。- 在进程之间,参数永远不会进行
broadcast。该module对梯度执行一个all-reduce步骤,并假设在所有进程中,可以被optimizer以相同的方式进行更改。 在每一次迭代中,Buffers(BatchNorm stats等) 是进行broadcast的,从rank 0的进程中的module进行广播,广播到系统的其他副本中。
