Tensorflow 怎样使用内置 estimator 进行分布式计算

按照 tensorflow 官方的 estimator.train_and_evaluate 的描述构造一个简单测试用例,代码如下:
单机可以正常运行,但是按照描述转成分布式调用时,train 过程始终不能进行,程序直接在 train 时卡住,请大神们帮忙看看问题出在哪里?万分感谢
另外谁有使用 build-in estimator 例如 DNNClassifier 等 可以分布式执行成功小例子,分享一下。。。多谢!!!

1)使用 estimator 进行训练评估代码如下:

import tensorflow as tf
import os
import sys
import json
import logging
import numpy as np
x = np.random.rand (1000)
y = np.random.choice ([0,1],1000)


def data_input ():
    ret={}
    ret ['x'] = x   
    y_batch = y
    print "data"
    return ret,y_batch

       
tf.logging.set_verbosity (tf.logging.DEBUG)
my_feature_columns=[]
v_feature_column = tf.feature_column.numeric_column (key="x",shape=[])
my_feature_columns.append (v_feature_column)

estimator = tf.estimator.DNNClassifier (
    feature_columns=my_feature_columns,
    hidden_units=[1024, 512, 256],
    model_dir='/home/clxman/tf/')


train_spec = tf.estimator.TrainSpec (input_fn=lambda:data_input (), max_steps=1000)
eval_spec = tf.estimator.EvalSpec (input_fn=lambda:data_input ())

tf.estimator.train_and_evaluate (estimator, train_spec, eval_spec)

2)单机运行,输出如下:train 正常执行

lxman@clxman-VirtualBox:~/test$ python test_c.py
/home/clxman/.local/lib/python2.7/site-packages/h5py/__init__.py:36: FutureWarning: Conversion of the second argument of issubdtype from `float` to `np.floating` is deprecated. In future, it will be treated as `np.float64 == np.dtype (float).type`.
  from ._conv import register_converters as _register_converters
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_task_type': 'worker', '_train_distribute': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f4e37077890>, '_evaluation_master': '', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_service': None, '_num_ps_replicas': 0, '_tf_random_seed': None, '_master': '', '_num_worker_replicas': 1, '_task_id': 0, '_log_step_count_steps': 100, '_model_dir': '/home/clxman/tf/', '_global_id_in_cluster': 0, '_save_summary_steps': 100}
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after 600 secs (eval_spec.throttle_secs) or training is finished.
data
INFO:tensorflow:Calling model_fn.
DEBUG:tensorflow:Transforming feature_column _NumericColumn (key='x', shape=(), default_value=None, dtype=tf.float32, normalizer_fn=None).
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
2018-06-12 23:50:25.702344: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Saving checkpoints for 1 into /home/clxman/tf/model.ckpt.
INFO:tensorflow:loss = 693.219, step = 1
INFO:tensorflow:global_step/sec: 8.12489
INFO:tensorflow:loss = 691.08575, step = 101 (12.309 sec)
INFO:tensorflow:global_step/sec: 8.11321
INFO:tensorflow:loss = 690.9834, step = 201 (12.325 sec)

3)以下通过命令行启动多个进程,命令行配置 TF_CONFIG 环境变量,之后就是启动 test_c.py 脚本
chief 输入:

clxman@clxman-VirtualBox:~/test$ TF_CONFIG='{
    "cluster": {
        "chief": ["192.168.6.99.123:2222"],
        "worker": ["192.168.6.99.123:2300"],
        "ps": ["192.168.6.99.123:2400"]
    },
    "task": {"type": "chief", "index": 0}
}'  python test_c.py
ps 输入:
clxman@clxman-VirtualBox:~/test$ TF_CONFIG='{
    "cluster": {
        "chief": ["192.168.6.99.123:2222"],
        "worker": ["192.168.6.99.123:2300"],
        "ps": ["192.168.6.99.123:2400"]
    },
    "task": {"type": "ps", "index": 0}
}'  python test_c.py

worker 输入:
clxman@clxman-VirtualBox:~/test$ TF_CONFIG='{
    "cluster": {
        "chief": ["192.168.6.99.123:2222"],
        "worker": ["192.168.6.99.123:2300"],
        "ps": ["192.168.6.99.123:2400"]
    },
    "task": {"type": "worker", "index": 0}
}'  python test_c.py

evaluator 输入:

clxman@clxman-VirtualBox:~/test$ TF_CONFIG='{
    "cluster": {
        "chief": ["192.168.6.99.123:2222"],
        "worker": ["192.168.6.99.123:2300"],
        "ps": ["192.168.6.99.123:2400"]
    },
    "task": {"type": "evaluator", "index": 0}
}'  python test_c.py

4)输出信息(选择 chief 节点),日志输出如下 : 不进行 train 操作,下面的” data “字符串输出说明已经进入了 data_input 获取数据调用。
但是 /home/clxman/tf 目录没有生成模型,只有 event 事件。

INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_task_type': u'chief', '_train_distribute': None, '_is_chief': True, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7fc0024f18d0>, '_evaluation_master': '', '_save_checkpoints_steps': None, '_keep_checkpoint_every_n_hours': 10000, '_service': None, '_num_ps_replicas': 1, '_tf_random_seed': None, '_master': u'grpc://192.168.6.99.123:2222', '_num_worker_replicas': 2, '_task_id': 0, '_log_step_count_steps': 100, '_model_dir': '/home/clxman/tf/', '_global_id_in_cluster': 0, '_save_summary_steps': 100}
INFO:tensorflow:Start Tensorflow server.
2018-06-12 23:23:25.694865: I tensorflow/core/platform/cpu_feature_guard.cc:140] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2
2018-06-12 23:23:25.697065: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job chief -> {0 -> localhost:2222}
2018-06-12 23:23:25.697159: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> 192.168.6.99.123:2400}
2018-06-12 23:23:25.697180: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> 192.168.6.99.123:2300}
2018-06-12 23:23:25.698882: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:332] Started server with target: grpc://localhost:2222
data
INFO:tensorflow:Calling model_fn.
DEBUG:tensorflow:Transforming feature_column _NumericColumn (key='x', shape=(), default_value=None, dtype=tf.float32, normalizer_fn=None).
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.

另外谁有使用 build-in estimator 例如 DNNClassifier 等 可以分布式执行成功小例子,分享一下。。。多谢!!!


chengliang 2018-6-13 00:08:45