TensorFlow 分布式训练

TensorFlow 分布式训练

https://tf.wiki/zh/appendix/distributed.html

2 Likes

请问如果不使用 model.compile () 和 model.fit () 而是手动进行模型训练(实现 tf.keras.model,使用 apply_gradients)该如何使用分布式训练方式?

可以参考 https://www.tensorflow.org/guide/distributed_training#using_tfdistributestrategy_with_custom_training_loops 里的 Using tf.distribute.Strategy with custom training loops

非常感谢!

雪麟老师您好,tensorflow2.0 基础部分已经看完了,感谢您提供这么清晰实用的教程~ 下一步想做一些实际项目的工程实践,想咨询一下您有合适的资料推荐吗?~ 非常感谢 :slight_smile:

我在 https://tf.wiki/zh_hans/appendix/recommended_books.html 中列举了一些我认为不错的资料可参考。关于实践,如果侧重模型开发构建,建议尝试去复现一些经典论文中的算法。如果侧重部署,则建议自己尝试去开发一个小型的,用到人工智能技术的项目(例如 app 之类的)。具体的项目需要结合你自身的研究方向或者工程背景(技术栈),我觉得和你自身背景越贴切越好。

老师你好,想请问下 strategy = tf.distribute.MirroredStrategy () 支持 Model 子类化创建自定义模型使用多 gpu 分布式训练么,我自定义了一个 Resnet18 模型进行 cifar100 的多分布训练,但训练效果很差,测试集 acc 才 1%左右,好像参数无法更新,而在改变 Sequential 按层顺序创建模型,其他不变,效果就提升很多,
构建模型的代码如下,麻烦看下有什么问题:

with strategy.scope ():
    # 定义一个 3x3 卷积
    def regularized_padded_conv (*args,**kwargs):
        return layers.Conv2D (*args,**kwargs,padding='same',kernel_regularizer=regularizers.l2 (5e-5),
                             use_bias=False,kernel_initializer='glorot_normal')

    # 定义 Basic Block 模块。对应 Resnet18 和 Resnet34
    class BasecBlock (layers.Layer):
        expansion=1
    
        def __init__(self,in_channels,out_channels,stride=1):
            super (BasecBlock,self).__init__()
            #1
            self.conv1=regularized_padded_conv (out_channels,kernel_size=3,strides=stride)
            self.bn1=layers.BatchNormalization ()
        
            #2
            self.conv2=regularized_padded_conv (out_channels,kernel_size=3,strides=1)
            self.bn2=layers.BatchNormalization ()
        
            #3
            if stride!=1 or in_channels!=self.expansion * out_channels:
                self.shortcut= Sequential ([regularized_padded_conv (self.expansion * out_channels,kernel_size=3,strides=stride),
                                      layers.BatchNormalization ()])
            else :
                self.shortcut= lambda x ,_ : x 
        
        @tf.function
        def call (self,inputs,training=False):
        
            x=self.conv1 (inputs)
            x=self.bn1 (x,training=training)
            x=tf.nn.relu (x)
        
            x=self.conv2 (x)
            x=self.bn2 (x,training=training)
        
            x_short=self.shortcut (inputs,training)
        
            x=x+x_short
            out=tf.nn.relu (x)
        
            return out 

    # 自定义模型,ResBlock 模块。继承 keras.Model 或者 keras.Layer 都可以
    class ResNet (tf.keras.Model):
    
        # 第 1 个参数 blocks:对应 2 个自定义模块 BasicBlock 和 Bottleneck, 其中 BasicBlock 对应 res18 和 res34,Bottleneck 对应 res50,res101 和 res152,
        # 第 2 个参数 layer_dims:[2, 2, 2, 2] 4 个 Res Block,每个包含 2 个 Basic Block
        # 第 3 个参数 num_classes:我们的全连接输出,取决于输出有多少类
        def __init__(self,blocks,layer_dims,initial_filters=64,num_classes=100):
            super (ResNet,self).__init__()
        
            self.in_channels=initial_filters
        
            #
            self.stem=Sequential ([regularized_padded_conv (initial_filters,kernel_size=3,strides=1),
                                layers.BatchNormalization ()])
        
            #
            self.layer1=self.build_resblock (blocks,initial_filters,layer_dims [0],stride=1)
            self.layer2=self.build_resblock (blocks,initial_filters*2,layer_dims [1],stride=2)
            self.layer3=self.build_resblock (blocks,initial_filters*4,layer_dims [2],stride=2)
            self.layer4=self.build_resblock (blocks,initial_filters*8,layer_dims [3],stride=2)
        
            self.final_bn=layers.BatchNormalization ()
        
            self.avg_pool=layers.GlobalAveragePooling2D ()
            self.dense=layers.Dense (num_classes)
        
        def build_resblock (self,blocks,out_channels,num_blocks,stride):
            strides=[stride]+[1]*(num_blocks-1)
            res_block=Sequential ()
        
            for stride in strides:
                res_block.add (blocks (self.in_channels,out_channels,stride))
                self.in_channels=out_channels
            return res_block
    
        @tf.function
        def call (self,inputs,training):
        
            x=self.stem (inputs,training)
            x=tf.nn.relu (x)
        
            x=self.layer1 (x,training=training)
            x=self.layer2 (x,training=training)
            x=self.layer3 (x,training=training)
            x=self.layer4 (x,training=training)
        
            x=self.final_bn (x,training=training)
            x=tf.nn.relu (x)
            x=self.avg_pool (x)
            out=self.dense (x)
        
            return out
        
    def resnet18 ():
        return ResNet (BasecBlock,[2,2,2,2])
    model=resnet18 ()
    model.build (input_shape=(None,32,32,3))

tensorflow.org/tutorials/distribute/custom_training#create_the_model 说是可以 Subclassing,不过似乎目前还是 Sequential 模式比较不容易出问题。分布式训练的 debug 就比较麻烦,建议一层一层地加入然后调试是哪一层出了问题。

如果是多机多卡,是在MultiWorkerMirroredStrategy下再开一个多卡策略?

我没有试过既多机又多卡的环境,不过看官方文档上的说明,似乎 MultiWorkerMirroredStrategy 就够了

tf.distribute.experimental.MultiWorkerMirroredStrategy is very similar to MirroredStrategy . It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs.

雪麒老师您好,我在用单机多卡的环境下试图用MirroredStrategy做分布式训练,但它会提示WARNING: tensorflow: Not all devices in tf.distribute.Strategy are visible to TensorFlow。且训练速度与单卡相同,用terminal检查gpu显存占用情况,也是只有一张卡占用率很高,其他卡占用率很低,请问您知道这是什么情况吗?
期待您的解答,谢谢!

我似乎找到了问题所在,似乎是服务器没有安装NCCL导致的

1 Like

请问如果不使用gpu训练,而是使用cpu训练,比如deepfm等算法,如何进行分布式训练呢

我所了解的TensorFlow下的分布式训练大多是用于单机多GPU或者多机训练。如果想要单机多CPU核的并行训练,可能需要具体算法具体分析,可以看一下网上的开源代码,不一定要用 TensorFlow。

请问老师,如果我在多个物理机上进行TensorFlow分布式训练,并且没有使用分布式文件系统,那么在训练前需要手工把数据分发到不同机器上吗?

hi,老师你好:我抄了书上的例子,进行单机多卡训练。但现象是:

  1. 仅从训练时间看,确实有倍数的加快;
  2. 各个GPU占用了显存,但处于不计算的状态,且这种等待时间很长。
  3. 我怀疑:数据处理方面占用了很多时间,但不知道优化方案是怎样的?

以下是代码:

import tensorflow as tf
import tensorflow_datasets as tfds
import time
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1,2,3'

num_epochs = 5
batch_size_per_replica = 64
learing_rate = 0.001
print('strategy init')
strategy = tf.distribute.MirroredStrategy(devices=['/gpu:1', '/gpu:2', '/gpu:3'])
print('Number of devices: %d' % (strategy.num_replicas_in_sync))
batch_size = batch_size_per_replica * strategy.num_replicas_in_sync

def resize(image, label):
    image = tf.image.resize(image, [224, 224])/255.0
    return image, label
dataset = tfds.load('cats_vs_dogs', split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(map_func=resize).cache()
dataset = dataset.shuffle(1024)
dataset = dataset.batch(batch_size)
with strategy.scope():
    model = tf.keras.applications.MobileNetV2()
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learing_rate),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )
print('model fit')
start_time = time.time()
model.fit(dataset, epochs=num_epochs)
end_time = time.time()
print('time : %f' % (end_time-start_time))

第二种策略,代码会报错

RuntimeError                              Traceback (most recent call last)
<ipython-input-4-df7add50a6f4> in <module>()
     15     'task': {'type': 'worker', 'index': 0}
     16 })
---> 17 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
     18 batch_size = batch_size_per_replica * num_workers
     19 

6 frames
/usr/local/lib/python3.6/dist-packages/tensorflow/python/eager/context.py in configure_collective_ops(self, collective_leader, scoped_allocator_enabled_ops, use_nccl_communication, device_filters)
    727 
    728     if self._context_handle is not None:
--> 729       raise RuntimeError("Collective ops must be configured at program startup")
    730 
    731     self._collective_leader = collective_leader

RuntimeError: Collective ops must be configured at program startup

老师您好,在第二种情况,多机cpu训练的情况下程序一直在等待,

To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-03-26 14:49:55.649453: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-03-26 14:49:55.661519: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-03-26 14:49:55.685741: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job ps → {0 → localhost:2222}
2021-03-26 14:49:55.696396: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker → {0 → localhost:2223, 1 → localhost:2224}
2021-03-26 14:49:55.707198: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:2224
WARNING:tensorflow:Enabled NCCL communication but no GPUs detected/specified.

这是什么原因啊

加一行dataset = dataset.prefetch(tf.data.AUTOTUNE)

老师你好,单机多卡训练,自定义回调函数evaluate的encode的时候,无限出现

,应该怎么解决