Pytorch 分布式训练

date
May 27, 2022
Last edited time
Mar 27, 2023 08:52 AM
status
Published
slug
Pytorch分布式训练
tags
DL
summary
type
Post
Field
Plat

概述

DP

使用

DP 使用起来非常方便,只需要将原来单卡的 module 用 DP 改成多卡:

原理

DP 基于单机多卡,所有设备都负责计算和训练网络,除此之外, device[0] (并非 GPU 真实标号而是输入参数 device_ids 首位) 还要负责整合梯度,更新参数。图 1 即为 GPU 0 作为 device[0] 的例子。从图中我们可以看出,有三个主要过程:
  • 过程一(图中红色部分):各卡分别计算损失和梯度
  • 过程二(图中蓝色部分):所有梯度整合到 device[0]
  • 过程三(图中绿色部分):device[0] 进行参数更新,其他卡拉取 device[0] 的参数进行更新
所有卡都并行运算(图中红色),将梯度收集到 device[0](图中浅蓝色)和 device[0] 分享模型参数给其他 GPU(图中绿色)三个主要过程。
notion image
 

特点

  1. 负载不均衡
  1. 通信开销大
  1. 单进程,多线程。存在GIL锁
  1. 无法使用多机训练

DDP

使用

原理

DDP 也是数据并行,所以每张卡都有模型和输入。我们以多进程多线程为例,每起一个进程,该进程的 device[0] 都会从本地复制模型,如果该进程仍有多线程,就像 DP,模型会从 device[0] 复制到其他设备。
DDP 通过 Reducer 来管理梯度同步。为了提高通讯效率, Reducer 会将梯度归到不同的桶里(按照模型参数的 reverse order, 因为反向传播需要符合这样的顺序),一次归约一个桶。其中桶的大小为参数 bucket_cap_mb 默认为 25,可根据需要调整。下图即为一个例子。
可以看到每个进程里,模型参数都按照倒序放在桶里,每次归约一个桶。
notion image
DDP 通过在构建时注册 autograd hook 进行梯度同步。反向传播时,当一个梯度计算好后,相应的 hook 会告诉 DDP 可以用来归约。当一个桶里的梯度都可以了,Reducer 就会启动异步 allreduce 去计算所有进程的平均值。allreduce 异步启动使得 DDP 可以边计算边通信,提高效率。当所有桶都可以了,Reducer 会等所有 allreduce 完成,然后将得到的梯度写到 param.grad。

特点

  1. 多进程
    1. 和 DP 不同, DDP 采用多进程,最推荐的做法是每张卡一个进程从而避免单进程带来的影响。 DDP 同样支持单进程多线程多卡操作,自然也支持多进程多线程,不过需要注意一下 world_size。
  1. 通信效率
    1. DP 的通信成本随着 GPU 数量线性增长,而 DDP 支持 Ring AllReduce,其通信成本是恒定的,与 GPU 数量无关。
  1. 同步参数
    1. DP 通过收集梯度到 device[0],在device[0] 更新参数,然后其他设备复制 device[0] 的参数实现各个模型同步;
      DDP 通过保证初始状态相同并且改变量也相同(指同步梯度) ,保证模型同步。

Pytorch 分布式使用流程

基本概念

下面是分布式系统中常用的一些概念:
  • group
即进程组。默认情况下,只有一个组,一个 job 即为一个组,也即一个 world
当需要进行更加精细的通信时,可以通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。
  • world size
表示全局进程个数。
  • rank
表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。
  • local_rank
进程内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1GPU

基本使用流程

Pytorch 中分布式的基本使用流程如下:
  1. 在使用 distributed 包的任何其他函数之前,需要使用 init_process_group 初始化进程组,同时初始化 distributed 包。
  1. 如果需要进行小组内集体通信,用 new_group 创建子分组
  1. 创建分布式并行模型 DDP(model, device_ids=device_ids)
  1. 为数据集创建 Sampler
  1. 使用启动工具 torch.distributed.launch 在每个主机上执行一次脚本,开始训练
  1. 使用 destory_process_group() 销毁进程组

使用模板

下面以 TCP 初始化方式为例,共 3 太主机,每台主机 2GPU,进行分布式训练。

TCP 初始化方式

执行方式

说明

  1. TCP 方式中,在 init_process_group 中必须手动指定以下参数
  1. rank 为当前进程的进程号
  1. word_size 为当前 job 的总进程数
  1. init_method 内指定 tcp 模式,且所有进程的 ip:port 必须一致,设定为主进程的 ip:port
  1. 必须在 rank==0 的进程内保存参数。
  1. 若程序内未根据 rank 设定当前进程使用的 GPUs,则默认使用全部 GPU,且以数据并行的方式使用。
  1. 每条命令表示一个进程。若已开启的进程未达到 word_size 的数量,则所有进程会一直等待
  1. 每台主机上可以开启多个进程。但是,若未为每个进程分配合适的 GPU,则同机不同进程可能会共用 GPU,应该坚决避免这种情况。
  1. 使用 gloo 后端进行 GPU 训练时,会报错。
  1. 若每个进程负责多块 GPU,可以利用多 GPU 进行模型并行。如下所示:

Env 初始化方式

代码

执行方式

说明

  1. Env 方式中,在 init_process_group 中,无需指定任何参数
  1. 必须在 rank==0 的进程内保存参数。
  1. 该方式下,使用 torch.distributed.launch 在每台主机上,为其创建多进程,其中:
  1. nproc_per_node 参数指定为当前主机创建的进程数。一般设定为当前主机的 GPU 数量
  1. nnodes 参数指定当前 job 包含多少个节点
  1. node_rank 指定当前节点的优先级
  1. master_addrmaster_port 分别指定 master 节点的 ip:port
  1. 若没有为每个进程合理分配 GPU,则默认使用当前主机上所有的 GPU。即使一台主机上有多个进程,也会共用 GPU
  1. 使用 torch.distributed.launch 工具时,将会为当前主机创建 nproc_per_node 个进程,每个进程独立执行训练脚本。同时,它还会为每个进程分配一个 local_rank 参数,表示当前进程在当前主机上的编号。例如:rank=2, local_rank=0 表示第 3 个节点上的第 1 个进程。
  1. 需要合理利用 local_rank 参数,来合理分配本地的 GPU 资源
  1. 每条命令表示一个进程。若已开启的进程未达到 word_size 的数量,则所有进程会一直等待

函数说明

初始化进程组

  • 函数声明
    • 函数作用
      • 该函数需要在每个进程中进行调用,用于初始化该进程。在使用分布式时,该函数必须在 distributed 内所有相关函数之前使用。
    • 参数详解
      • backend指定当前进程要使用的通信后端 小写字符串,支持的通信后端有 gloompinccl 。建议用 nccl
      • init_method指定当前进程组初始化方式 可选参数,字符串形式。如果未指定 init_methodstore,则默认为 env://,表示使用读取环境变量的方式进行初始化。该参数与 store 互斥。
      • rank指定当前进程的优先级 int 值。表示当前进程的编号,即优先级。如果指定 store 参数,则必须指定该参数。 rank=0 的为主进程,即 master 节点。
      • world_size : 该 job 中的总进程数。如果指定 store 参数,则需要指定该参数。
      • timeout指定每个进程的超时时间 可选参数,datetime.timedelta 对象,默认为 30 分钟。该参数仅用于 Gloo 后端。
      • store 所有 worker 可访问的 key / value,用于交换连接 / 地址信息。与 init_method 互斥。

    创建进程组

    • 函数声明
      • 函数作用
        • new_group() 函数可用于使用所有进程的任意子集来创建新组。其返回一个分组句柄,可作为 collectives 相关函数的 group 参数 。collectives 是分布式函数,用于特定编程模式中的信息交换。
      • 参数详解
        • ranks指定新分组内的成员的 ranks 列表list ,其中每个元素为 int
        • timeout指定该分组进程组内的操作的超时时间 可选参数,datetime.timedelta 对象,默认为 30 分钟。该参数仅用于 Gloo 后端。
        • backend指定要使用的通信后端 小写字符串,支持的通信后端有 gloonccl ,必须与 init_process_group() 中一致。

      获取进程组属性

      • 函数声明
        • 函数说明
          • 返回给定进程组的 backend
        • 参数
          • group要获取信息的进程组
            • 进程组对象,默认为主进程组。如果指定另一个进程组,则调用该函数的进程必须为所指定的进程组的进程。
          • 返回
            • 给定进程组的后端,以小写字符串的形式给出

        获取进程组进程优先级

        • 函数原型
          • 函数说明
            • 返回当前进程的 rank
              rank 是赋值给一个分布式进程组组内的每个进程的唯一识别。一般而言,rank 均为从 0world_size 的整数。
          • 参数
            • group
              • 要获取信息的进程组对象,默认为主进程组。如果指定另一个进程组,则调用该函数的进程必须为所指定的进程组的进程。

          获取进程组进程数

          • 函数原型
            • 函数说明
              • 返回当前进程组内的进程数。
            • 参数
              • group
                • 要获取信息的进程组对象,默认为主进程组。如果指定另一个进程组,则调用该函数的进程必须为所指定的进程组的进程。

            获取进程组是否初始化

            • 函数原型
              • 函数说明
                • 检查默认进程组是否被初始化。

              通信后端

              概述

              使用分布式时,在梯度汇总求平均的过程中,各主机之间需要进行通信。因此,需要指定通信的协议架构等。torch.distributed 对其进行了封装。
              torch.distributed 支持 3 种后端,分别为 NCCLGlooMPI。各后端对 CPU / GPU 的支持如下所示:
              Backen
              gloo
              mpi
              nccl
              Device
              CPU
              GPU
              CPU
              GPU
              CPU
              GPU
              send
              ?
              barrier
              ?

              如何选择

              强烈建议:NCCL 是目前最快的后端,且对多进程分布式(Multi-Process Single-GPU)支持极好,可用于单节点以及多节点的分布式训练。 节点即主机。即使是单节点,由于底层机制不同,distributed 也比 DataParallel 方式要高效。
              基本原则:
              • NCCL 进行分布式 GPU 训练
              • Gloo 进行分布式 CPU 训练
              无限带宽互联的 GPU 集群
              • 使用 NCCL,因为它是目前唯一支持 InfiniBandGPUDirect 的后端
              无限带宽和 GPU 直连
              • 使用 NCCL,因为其目前提供最佳的分布式 GPU 训练性能。尤其是 multiprocess single-nodemulti-node distributed 训练。
              • 如果用 NCCL 训练有问题,再考虑使用 Gloo。(当前,GlooGPU 分布式上,相较于 NCCL 慢)
              无限带宽互联的 CPU 集群
              • 如果 InfiniBandIB 启用 IP,请使用 Gloo,否则使使用 MPI
              • 在未来将添加 infiniBandGloo 的支持
              以太网互联的 CPU 集群
              • 使用 Gloo,除非有特别的原因使用 MPI

              DistributedDataParallel

              原型

              功能

              将给定的 module 进行分布式封装, 其将输入在 batch 维度上进行划分,并分配到指定的 devices 上。
              module 会被复制到每台机器的每个 GPU 上,每一个模型的副本处理输入的一部分。
              在反向传播阶段,每个机器的每个 GPU 上的梯度进行汇总并求平均。与 DataParallel 类似,batch size 应该大于 GPU 总数。

              注意

              1. 要使用该 class,需要先对 torch.distributed 进行初进程组始化,可以通过 torch.distributed.init_process_group() 实现。
              1. module 仅在 gloonccl 后端上可用。
              1. 根据分布式原理,Constructordifferentiation of the output (或 a function of the output of this module) 是一个分布式同步点。在不同的进程执行不同的代码时,需要考虑这一点。
              1. module 假设,所有的参数在其创建时,在模型中已经注册,之后没有新的参数加入或者参数移除。对于 buffers 也是一样。(这也是由分布式原理决定)
              1. module 假设,所有的参数在每个分布式进程模型中注册的顺序一致。该 module 自身将会按照该模型中参数注册的相反顺序执行梯度的 all-reduction。换言之,用户应该保证,每个分布式进程模型一样,且参数注册顺序一致。(这也是由分布式原理决定)
              1. 如果计划使用该 module,并用 NCCL 后端或 Gloo 后端 (使用 infiniband),需要与多 workersDataloader 一同使用,请修改多进程启动算法为 forkserver (python 3 only) 或 spawn 。不幸的是,Gloo (使用 infiniband) 和 NCCL2 fork 并不安全,并且如果不改变配置时,很可能会 deadlocks
              1. module 上定义的前向传播和反向传播 hooks 和其子 modules 将不会涉及,除非 hooksforward 中进行了初始化。
              1. 在使用 DistributedDataParallel 封装 model 后,不应该再修改模型的参数。也就是说,当使用 DistributedDataParallel 打包 model 时,DistributedDataParallelconstructor 将会在模型上注册额外的归约函数,该函数作用于模型的所有参数。
              如果在构建 DistributedDataParallel 之后,改变模型的参数,这是不被允许的,并且可能会导致不可预期的后果,因为部分参数的梯度归约函数可能不会被调用。
              1. 在进程之间,参数永远不会进行 broadcast。该 module 对梯度执行一个 all-reduce 步骤,并假设在所有进程中,可以被 optimizer 以相同的方式进行更改。 在每一次迭代中,Buffers (BatchNorm stats 等) 是进行 broadcast 的,从 rank 0 的进程中的 module 进行广播,广播到系统的其他副本中。

              © Lazurite 2021 - 2023