La ejecución del Estimador de flujo tensor distribuido no activa la evaluación ni la exportación

Estoy probando entrenamiento distribuido con estimadores de tensorflow. En mi ejemplo, tf.estimator.train_and_evaluation una función sinusal simple con un estimador personalizado usando tf.estimator.train_and_evaluation . Después de la capacitación y la evaluación, quiero exportar el modelo para tenerlo listo para el servicio de tensorflow . Sin embargo, la evaluación y la exportación solo se activan cuando se ejecuta el estimador de forma no distribuida.

El modelo y los estimadores se definen de la siguiente manera:

 def my_model(features, labels, mode): # define simple dense network net = tf.layers.dense(features['x'], units=8, activation=tf.nn.tanh) net = tf.layers.dense(net, units=8, activation=tf.nn.tanh) net = tf.layers.dense(net, units=8, activation=tf.nn.tanh) net = tf.layers.dense(net, units=8, activation=tf.nn.tanh) net = tf.layers.dense(net, units=8, activation=tf.nn.tanh) net = tf.layers.dense(net, units=8, activation=tf.nn.tanh) net = tf.layers.dense(net, units=8, activation=tf.nn.tanh) net = tf.layers.dense(net, units=8, activation=tf.nn.tanh) # output layer predictions = tf.layers.dense(net, units=1, activation=tf.nn.tanh) if mode == tf.estimator.ModeKeys.PREDICT: # define output message for tensorflow serving export_outputs = {'predict_output': tf.estimator.export.PredictOutput({"predictions": predictions})} return tf.estimator.EstimatorSpec(mode=mode, predictions={'predictions': predictions}, export_outputs=export_outputs) elif mode == tf.estimator.ModeKeys.EVAL: # for evaluation simply use mean squared error loss = tf.losses.mean_squared_error(labels=labels, predictions=predictions) metrics = {'mse': tf.metrics.mean_squared_error(labels, predictions)} return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=metrics) elif mode == tf.estimator.ModeKeys.TRAIN: # train on mse with Adagrad optimizer loss = tf.losses.mean_squared_error(labels=labels, predictions=predictions) optimizer = tf.train.AdagradOptimizer(learning_rate=0.1) train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step()) return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op) else: raise ValueError("unhandled mode: %s" % str(mode)) def main(_): # prepare training data default_batch_size = 50 examples = [{'x': x, 'y': math.sin(x)} for x in [random.random()*2*math.pi for _ in range(10000)]] estimator = tf.estimator.Estimator(model_fn=my_model, config=tf.estimator.RunConfig(model_dir='sin_model', save_summary_steps=100)) # function converting examples to dataset def dataset_fn(): # returns a dataset serving batched (feature_map, label)-pairs # eg ({'x': [1.0, 0.3, 1.1...]}, [0.84, 0.29, 0.89...]) return tf.data.Dataset.from_generator( lambda: iter(examples), output_types={"x": tf.float32, "y": tf.float32}, output_shapes={"x": [], "y": []}) \ .map(lambda x: ({'x': [x['x']]}, [x['y']])) \ .repeat() \ .batch(default_batch_size) # function to export model to be used for serving feature_spec = {'x': tf.FixedLenFeature([1], tf.float32)} def serving_input_fn(): serialized_tf_example = tf.placeholder(dtype=tf.string, shape=[default_batch_size]) receiver_tensors = {'examples': serialized_tf_example} features = tf.parse_example(serialized_tf_example, feature_spec) return tf.estimator.export.ServingInputReceiver(features, receiver_tensors) # train, evaluate and export train_spec = tf.estimator.TrainSpec(input_fn=dataset_fn, max_steps=1000) eval_spec = tf.estimator.EvalSpec(input_fn=dataset_fn, steps=100, exporters=[tf.estimator.FinalExporter('sin', serving_input_fn)]) tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) if __name__ == '__main__': tf.app.run(main) 

Al ejecutar este código en un solo proceso, recibo una carpeta de salida que contiene puntos de control del modelo, datos de evaluación y la exportación del modelo

 $ ls sin_model/ checkpoint model.ckpt-0.index eval model.ckpt-0.meta events.out.tfevents.1532426226.simon model.ckpt-1000.data-00000-of-00001 export model.ckpt-1000.index graph.pbtxt model.ckpt-1000.meta model.ckpt-0.data-00000-of-00001 

Sin embargo, al distribuir el proceso de capacitación (en esta configuración de prueba solo en la máquina local) faltan las carpetas de evaluación y exportación.

Comienzo los nodos individuales usando la siguiente configuración de cluster:

 {"cluster": { "ps": ["localhost:2222"], "chief": ["localhost:2223"], "worker": ["localhost:2224"] } 

El inicio del servidor ps se ve como sigue

 $ TF_CONFIG='{"cluster": {"chief": ["localhost:2223"], "worker": ["localhost:2224"], "ps": ["localhost:2222"]}, "task": {"type": "ps", "index": 0}}' CUDA_VISIBLE_DEVICES= python custom_estimator.py 2018-07-24 12:09:04.913967: E tensorflow/stream_executor/cuda/cuda_driver.cc:397] failed call to cuInit: CUDA_ERROR_NO_DEVICE 2018-07-24 12:09:04.914008: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:132] retrieving CUDA diagnostic information for host: simon 2018-07-24 12:09:04.914013: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:139] hostname: simon 2018-07-24 12:09:04.914035: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:163] libcuda reported version is: 384.130.0 2018-07-24 12:09:04.914059: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:167] kernel reported version is: 384.130.0 2018-07-24 12:09:04.914079: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:249] kernel version seems to match DSO: 384.130.0 2018-07-24 12:09:04.914961: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job chief -> {0 -> localhost:2223} 2018-07-24 12:09:04.914971: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222} 2018-07-24 12:09:04.914976: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2224} 2018-07-24 12:09:04.915658: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:369] Started server with target: grpc://localhost:2222 

( CUDA_VISIBLE_DEVICES= a la línea de comandos para evitar que el trabajador y el jefe asignen la memoria de la GPU. Esto provoca una failed call to cuInit: CUDA_ERROR_NO_DEVICE Error que, sin embargo, no es crítico)

El jefe se inicia de la siguiente manera

 $ TF_CONFIG='{"cluster": {"chief": ["localhost:2223"], "worker": ["localhost:2224"], "ps": ["localhost:2222"]}, "task": {"type": "chief", "index": 0}}' CUDA_VISIBLE_DEVICES= python custom_estimator.py 2018-07-24 12:09:10.532171: E tensorflow/stream_executor/cuda/cuda_driver.cc:397] failed call to cuInit: CUDA_ERROR_NO_DEVICE 2018-07-24 12:09:10.532234: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:132] retrieving CUDA diagnostic information for host: simon 2018-07-24 12:09:10.532241: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:139] hostname: simon 2018-07-24 12:09:10.532298: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:163] libcuda reported version is: 384.130.0 2018-07-24 12:09:10.532353: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:167] kernel reported version is: 384.130.0 2018-07-24 12:09:10.532359: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:249] kernel version seems to match DSO: 384.130.0 2018-07-24 12:09:10.533195: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job chief -> {0 -> localhost:2223} 2018-07-24 12:09:10.533207: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222} 2018-07-24 12:09:10.533211: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2224} 2018-07-24 12:09:10.533835: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:369] Started server with target: grpc://localhost:2223 2018-07-24 12:09:14.038636: I tensorflow/core/distributed_runtime/master_session.cc:1165] Start master session 71a2748ad69725ae with config: allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } 

Y el trabajador se inicia de la siguiente manera:

 $ TF_CONFIG='{"cluster": {"chief": ["localhost:2223"], "worker": ["localhost:2224"], "ps": ["localhost:2222"]}, "task": {"type": "worker", "index": 0}}' CUDA_VISIBLE_DEVICES= python custom_estimator.py 2018-07-24 12:09:13.172260: E tensorflow/stream_executor/cuda/cuda_driver.cc:397] failed call to cuInit: CUDA_ERROR_NO_DEVICE 2018-07-24 12:09:13.172320: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:132] retrieving CUDA diagnostic information for host: simon 2018-07-24 12:09:13.172327: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:139] hostname: simon 2018-07-24 12:09:13.172362: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:163] libcuda reported version is: 384.130.0 2018-07-24 12:09:13.172399: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:167] kernel reported version is: 384.130.0 2018-07-24 12:09:13.172405: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:249] kernel version seems to match DSO: 384.130.0 2018-07-24 12:09:13.173230: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job chief -> {0 -> localhost:2223} 2018-07-24 12:09:13.173242: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job ps -> {0 -> localhost:2222} 2018-07-24 12:09:13.173247: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:215] Initialize GrpcChannelCache for job worker -> {0 -> localhost:2224} 2018-07-24 12:09:13.173783: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:369] Started server with target: grpc://localhost:2224 2018-07-24 12:09:18.774264: I tensorflow/core/distributed_runtime/master_session.cc:1165] Start master session 1d13ac84816fdc80 with config: allow_soft_placement: true graph_options { rewrite_options { meta_optimizer_iterations: ONE } } 

Después de un breve período de tiempo, el proceso principal se detiene y la carpeta sin_model existe con el punto de control del modelo pero no hay exportación ni evaluación:

 $ ls sin_model/ checkpoint model.ckpt-0.meta events.out.tfevents.1532426950.simon model.ckpt-1001.data-00000-of-00001 graph.pbtxt model.ckpt-1001.index model.ckpt-0.data-00000-of-00001 model.ckpt-1001.meta model.ckpt-0.index 

¿Se necesita alguna configuración adicional para evaluar o exportar en la configuración distribuida?

Estoy trabajando con python 3.5 y tensorflow 1.8

En el modo distribuido, puede ejecutar evaluaciones en paralelo a la capacitación configurando el type tarea como evaluator :

 { "cluster": { "ps": ["localhost:2222"], "chief": ["localhost:2223"], "worker": ["localhost:2224"] }, "task": { "type": "evaluator", "index": 0 }, "environment": "cloud" } 

No necesita definir evaluator dentro de la definición de su cluster. Además, no está seguro de si esto está relacionado con su caso, pero tal vez el environment: 'cloud' configuración environment: 'cloud' en la configuración de su agrupación podría ayudar.