请问如果不使用 model.compile () 和 model.fit () 而是手动进行模型训练(实现 tf.keras.model,使用 apply_gradients)该如何使用分布式训练方式?
可以参考 https://www.tensorflow.org/guide/distributed_training#using_tfdistributestrategy_with_custom_training_loops 里的 Using tf.distribute.Strategy with custom training loops
非常感谢!
雪麟老师您好,tensorflow2.0 基础部分已经看完了,感谢您提供这么清晰实用的教程~ 下一步想做一些实际项目的工程实践,想咨询一下您有合适的资料推荐吗?~ 非常感谢
我在 https://tf.wiki/zh_hans/appendix/recommended_books.html 中列举了一些我认为不错的资料可参考。关于实践,如果侧重模型开发构建,建议尝试去复现一些经典论文中的算法。如果侧重部署,则建议自己尝试去开发一个小型的,用到人工智能技术的项目(例如 app 之类的)。具体的项目需要结合你自身的研究方向或者工程背景(技术栈),我觉得和你自身背景越贴切越好。
老师你好,想请问下 strategy = tf.distribute.MirroredStrategy () 支持 Model 子类化创建自定义模型使用多 gpu 分布式训练么,我自定义了一个 Resnet18 模型进行 cifar100 的多分布训练,但训练效果很差,测试集 acc 才 1%左右,好像参数无法更新,而在改变 Sequential 按层顺序创建模型,其他不变,效果就提升很多,
构建模型的代码如下,麻烦看下有什么问题:
with strategy.scope ():
# 定义一个 3x3 卷积
def regularized_padded_conv (*args,**kwargs):
return layers.Conv2D (*args,**kwargs,padding='same',kernel_regularizer=regularizers.l2 (5e-5),
use_bias=False,kernel_initializer='glorot_normal')
# 定义 Basic Block 模块。对应 Resnet18 和 Resnet34
class BasecBlock (layers.Layer):
expansion=1
def __init__(self,in_channels,out_channels,stride=1):
super (BasecBlock,self).__init__()
#1
self.conv1=regularized_padded_conv (out_channels,kernel_size=3,strides=stride)
self.bn1=layers.BatchNormalization ()
#2
self.conv2=regularized_padded_conv (out_channels,kernel_size=3,strides=1)
self.bn2=layers.BatchNormalization ()
#3
if stride!=1 or in_channels!=self.expansion * out_channels:
self.shortcut= Sequential ([regularized_padded_conv (self.expansion * out_channels,kernel_size=3,strides=stride),
layers.BatchNormalization ()])
else :
self.shortcut= lambda x ,_ : x
@tf.function
def call (self,inputs,training=False):
x=self.conv1 (inputs)
x=self.bn1 (x,training=training)
x=tf.nn.relu (x)
x=self.conv2 (x)
x=self.bn2 (x,training=training)
x_short=self.shortcut (inputs,training)
x=x+x_short
out=tf.nn.relu (x)
return out
# 自定义模型,ResBlock 模块。继承 keras.Model 或者 keras.Layer 都可以
class ResNet (tf.keras.Model):
# 第 1 个参数 blocks:对应 2 个自定义模块 BasicBlock 和 Bottleneck, 其中 BasicBlock 对应 res18 和 res34,Bottleneck 对应 res50,res101 和 res152,
# 第 2 个参数 layer_dims:[2, 2, 2, 2] 4 个 Res Block,每个包含 2 个 Basic Block
# 第 3 个参数 num_classes:我们的全连接输出,取决于输出有多少类
def __init__(self,blocks,layer_dims,initial_filters=64,num_classes=100):
super (ResNet,self).__init__()
self.in_channels=initial_filters
#
self.stem=Sequential ([regularized_padded_conv (initial_filters,kernel_size=3,strides=1),
layers.BatchNormalization ()])
#
self.layer1=self.build_resblock (blocks,initial_filters,layer_dims [0],stride=1)
self.layer2=self.build_resblock (blocks,initial_filters*2,layer_dims [1],stride=2)
self.layer3=self.build_resblock (blocks,initial_filters*4,layer_dims [2],stride=2)
self.layer4=self.build_resblock (blocks,initial_filters*8,layer_dims [3],stride=2)
self.final_bn=layers.BatchNormalization ()
self.avg_pool=layers.GlobalAveragePooling2D ()
self.dense=layers.Dense (num_classes)
def build_resblock (self,blocks,out_channels,num_blocks,stride):
strides=[stride]+[1]*(num_blocks-1)
res_block=Sequential ()
for stride in strides:
res_block.add (blocks (self.in_channels,out_channels,stride))
self.in_channels=out_channels
return res_block
@tf.function
def call (self,inputs,training):
x=self.stem (inputs,training)
x=tf.nn.relu (x)
x=self.layer1 (x,training=training)
x=self.layer2 (x,training=training)
x=self.layer3 (x,training=training)
x=self.layer4 (x,training=training)
x=self.final_bn (x,training=training)
x=tf.nn.relu (x)
x=self.avg_pool (x)
out=self.dense (x)
return out
def resnet18 ():
return ResNet (BasecBlock,[2,2,2,2])
model=resnet18 ()
model.build (input_shape=(None,32,32,3))
tensorflow.org/tutorials/distribute/custom_training#create_the_model 说是可以 Subclassing,不过似乎目前还是 Sequential 模式比较不容易出问题。分布式训练的 debug 就比较麻烦,建议一层一层地加入然后调试是哪一层出了问题。
如果是多机多卡,是在MultiWorkerMirroredStrategy下再开一个多卡策略?
我没有试过既多机又多卡的环境,不过看官方文档上的说明,似乎 MultiWorkerMirroredStrategy 就够了
tf.distribute.experimental.MultiWorkerMirroredStrategy
is very similar toMirroredStrategy
. It implements synchronous distributed training across multiple workers, each with potentially multiple GPUs.
雪麒老师您好,我在用单机多卡的环境下试图用MirroredStrategy做分布式训练,但它会提示WARNING: tensorflow: Not all devices in tf.distribute.Strategy
are visible to TensorFlow。且训练速度与单卡相同,用terminal检查gpu显存占用情况,也是只有一张卡占用率很高,其他卡占用率很低,请问您知道这是什么情况吗?
期待您的解答,谢谢!
我似乎找到了问题所在,似乎是服务器没有安装NCCL导致的
请问如果不使用gpu训练,而是使用cpu训练,比如deepfm等算法,如何进行分布式训练呢
我所了解的TensorFlow下的分布式训练大多是用于单机多GPU或者多机训练。如果想要单机多CPU核的并行训练,可能需要具体算法具体分析,可以看一下网上的开源代码,不一定要用 TensorFlow。
请问老师,如果我在多个物理机上进行TensorFlow分布式训练,并且没有使用分布式文件系统,那么在训练前需要手工把数据分发到不同机器上吗?
hi,老师你好:我抄了书上的例子,进行单机多卡训练。但现象是:
- 仅从训练时间看,确实有倍数的加快;
- 各个GPU占用了显存,但处于不计算的状态,且这种等待时间很长。
- 我怀疑:数据处理方面占用了很多时间,但不知道优化方案是怎样的?
以下是代码:
import tensorflow as tf
import tensorflow_datasets as tfds
import time
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1,2,3'
num_epochs = 5
batch_size_per_replica = 64
learing_rate = 0.001
print('strategy init')
strategy = tf.distribute.MirroredStrategy(devices=['/gpu:1', '/gpu:2', '/gpu:3'])
print('Number of devices: %d' % (strategy.num_replicas_in_sync))
batch_size = batch_size_per_replica * strategy.num_replicas_in_sync
def resize(image, label):
image = tf.image.resize(image, [224, 224])/255.0
return image, label
dataset = tfds.load('cats_vs_dogs', split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(map_func=resize).cache()
dataset = dataset.shuffle(1024)
dataset = dataset.batch(batch_size)
with strategy.scope():
model = tf.keras.applications.MobileNetV2()
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=learing_rate),
loss=tf.keras.losses.sparse_categorical_crossentropy,
metrics=[tf.keras.metrics.sparse_categorical_accuracy]
)
print('model fit')
start_time = time.time()
model.fit(dataset, epochs=num_epochs)
end_time = time.time()
print('time : %f' % (end_time-start_time))
第二种策略,代码会报错
RuntimeError Traceback (most recent call last)
<ipython-input-4-df7add50a6f4> in <module>()
15 'task': {'type': 'worker', 'index': 0}
16 })
---> 17 strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
18 batch_size = batch_size_per_replica * num_workers
19
6 frames
/usr/local/lib/python3.6/dist-packages/tensorflow/python/eager/context.py in configure_collective_ops(self, collective_leader, scoped_allocator_enabled_ops, use_nccl_communication, device_filters)
727
728 if self._context_handle is not None:
--> 729 raise RuntimeError("Collective ops must be configured at program startup")
730
731 self._collective_leader = collective_leader
RuntimeError: Collective ops must be configured at program startup
老师您好,在第二种情况,多机cpu训练的情况下程序一直在等待,
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2021-03-26 14:49:55.649453: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-03-26 14:49:55.661519: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-03-26 14:49:55.685741: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job ps → {0 → localhost:2222}
2021-03-26 14:49:55.696396: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:301] Initialize GrpcChannelCache for job worker → {0 → localhost:2223, 1 → localhost:2224}
2021-03-26 14:49:55.707198: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:411] Started server with target: grpc://localhost:2224
WARNING:tensorflow:Enabled NCCL communication but no GPUs detected/specified.
这是什么原因啊
加一行dataset = dataset.prefetch(tf.data.AUTOTUNE)