Cómo realizar una captación previa de datos utilizando una función de python personalizada en tensorflow

Estoy tratando de obtener datos de entrenamiento para ocultar la latencia de E / S. Me gustaría escribir código Python personalizado que carga datos del disco y procesa los datos (por ejemplo, agregando una ventana de contexto). En otras palabras, un hilo realiza el procesamiento previo de los datos y el otro hace la capacitación. ¿Es esto posible en TensorFlow?

Actualización: Tengo un ejemplo de trabajo basado en el ejemplo de @rry.

import numpy as np import tensorflow as tf import threading BATCH_SIZE = 5 TRAINING_ITERS = 4100 feature_input = tf.placeholder(tf.float32, shape=[128]) label_input = tf.placeholder(tf.float32, shape=[128]) q = tf.FIFOQueue(200, [tf.float32, tf.float32], shapes=[[128], [128]]) enqueue_op = q.enqueue([label_input, feature_input]) label_batch, feature_batch = q.dequeue_many(BATCH_SIZE) c = tf.reshape(feature_batch, [BATCH_SIZE, 128]) + tf.reshape(label_batch, [BATCH_SIZE, 128]) sess = tf.Session() def load_and_enqueue(sess, enqueue_op, coord): with open('dummy_data/features.bin') as feature_file, open('dummy_data/labels.bin') as label_file: while not coord.should_stop(): feature_array = np.fromfile(feature_file, np.float32, 128) if feature_array.shape[0] == 0: print('reach end of file, reset using seek(0,0)') feature_file.seek(0,0) label_file.seek(0,0) continue label_value = np.fromfile(label_file, np.float32, 128) sess.run(enqueue_op, feed_dict={feature_input: feature_array, label_input: label_value}) coord = tf.train.Coordinator() t = threading.Thread(target=load_and_enqueue, args=(sess,enqueue_op, coord)) t.start() for i in range(TRAINING_ITERS): sum = sess.run(c) print('train_iter='+str(i)) print(sum) coord.request_stop() coord.join([t]) 

Este es un caso de uso común, y la mayoría de las implementaciones utilizan las colas de TensorFlow para desacoplar el código de preprocesamiento del código de entrenamiento. Hay un tutorial sobre cómo usar las colas , pero los pasos principales son los siguientes:

  1. Defina una cola, q , que almacenará los datos preprocesados. TensorFlow admite la tf.FIFOQueue simple que produce elementos en el orden en que se pusieron en cola, y la tf.RandomShuffleQueue más avanzada que produce elementos en un orden aleatorio. Un elemento de cola es una tupla de uno o más tensores (que pueden tener diferentes tipos y formas). Todas las colas admiten operaciones de un solo elemento ( dequeue , dequeue ) y de lote ( enqueue_many , dequeue_many ), pero para usar las operaciones por lotes debe especificar las formas de cada tensor en un elemento de cola al construir la cola.

  2. Cree un subgrafo que ponga en cola elementos preprocesados ​​en la cola. Una forma de hacer esto sería definir algunas tf.placeholder() para los tensores correspondientes a un solo ejemplo de entrada, luego pasarlas a q.enqueue() . (Si su preprocesamiento produce un lote a la vez, debe usar q.enqueue_many() lugar). También puede incluir operaciones de TensorFlow en este subgrafo.

  3. Construye un subgrafo que realice entrenamiento. Esto se verá como un gráfico regular de TensorFlow, pero obtendrá su entrada llamando a q.dequeue_many(BATCH_SIZE) .

  4. Comience su sesión.

  5. Cree uno o más subprocesos que ejecuten su lógica de preprocesamiento, luego ejecute la operación de puesta en cola, alimentando los datos preprocesados. Puede encontrar las clases de utilidad tf.train.Coordinator y tf.train.QueueRunner útiles para esto.

  6. Ejecute su gráfico de entrenamiento (optimizador, etc.) como de costumbre.

EDIT: Aquí hay una función simple load_and_enqueue() y un fragmento de código para comenzar:

 # Features are length-100 vectors of floats feature_input = tf.placeholder(tf.float32, shape=[100]) # Labels are scalar integers. label_input = tf.placeholder(tf.int32, shape=[]) # Alternatively, could do: # feature_batch_input = tf.placeholder(tf.float32, shape=[None, 100]) # label_batch_input = tf.placeholder(tf.int32, shape=[None]) q = tf.FIFOQueue(100, [tf.float32, tf.int32], shapes=[[100], []]) enqueue_op = q.enqueue([feature_input, label_input]) # For batch input, do: # enqueue_op = q.enqueue_many([feature_batch_input, label_batch_input]) feature_batch, label_batch = q.dequeue_many(BATCH_SIZE) # Build rest of model taking label_batch, feature_batch as input. # [...] train_op = ... sess = tf.Session() def load_and_enqueue(): with open(...) as feature_file, open(...) as label_file: while True: feature_array = numpy.fromfile(feature_file, numpy.float32, 100) if not feature_array: return label_value = numpy.fromfile(feature_file, numpy.int32, 1)[0] sess.run(enqueue_op, feed_dict={feature_input: feature_array, label_input: label_value}) # Start a thread to enqueue data asynchronously, and hide I/O latency. t = threading.Thread(target=load_and_enqueue) t.start() for _ in range(TRAINING_EPOCHS): sess.run(train_op) 

En otras palabras, un hilo realiza el procesamiento previo de los datos y el otro hace la capacitación. ¿Es esto posible en TensorFlow?

Sí lo es. La solución de mrry funciona, pero existe más simple.

Recuperacion de datos

tf.py_func envuelve una función python y la utiliza como un operador TensorFlow. Así que podemos cargar los datos en sess.run() cada vez. El problema con este enfoque es que los datos se cargan durante sess.run() través del hilo principal.

Un ejemplo mínimo:

 def get_numpy_tensor(): return np.array([[1,2],[3,4]], dtype=np.float32) tensorflow_tensor = tf.py_func(get_numpy_tensor, [], tf.float32) 

Un ejemplo más complejo:

 def get_numpy_tensors(): # Load data from the disk into numpy arrays. input = np.array([[1,2],[3,4]], dtype=np.float32) target = np.int32(1) return input, target tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) tensorflow_input, tensorflow_target = 2*tensorflow_input, 2*tensorflow_target sess = tf.InteractiveSession() numpy_input, numpy_target = sess.run([tensorflow_input, tensorflow_target]) assert np.all(numpy_input==np.array([[2,4],[6,8]])) and numpy_target==2 

Recogiendo datos en otro hilo

Para sess.run() en cola nuestros datos en otro hilo (para que sess.run() no tenga que esperar los datos), podemos usar tf.train.batch() en nuestros operadores desde tf.py_func() .

Un ejemplo mínimo:

 tensor_shape = get_numpy_tensor().shape tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32, shapes=[tensor_shape]) # Run `tf.train.start_queue_runners()` once session is created. 

Podemos omitir las shapes argumento si tensorflow_tensor tiene su forma especificada:

 tensor_shape = get_numpy_tensor().shape tensorflow_tensor.set_shape(tensor_shape) tensorflow_tensors = tf.train.batch([tensorflow_tensor], batch_size=32) # Run `tf.train.start_queue_runners()` once session is created. 

Un ejemplo más complejo:

 input_shape, target_shape = (2, 2), () def get_numpy_tensors(): input = np.random.rand(*input_shape).astype(np.float32) target = np.random.randint(10, dtype=np.int32) print('f', end='') return input, target tensorflow_input, tensorflow_target = tf.py_func(get_numpy_tensors, [], [tf.float32, tf.int32]) batch_size = 2 tensorflow_inputs, tensorflow_targets = tf.train.batch([tensorflow_input, tensorflow_target], batch_size, shapes=[input_shape, target_shape], capacity=2) # Internal queue will contain at most `capasity=2` times `batch_size=2` elements `[tensorflow_input, tensorflow_target]`. tensorflow_inputs, tensorflow_targets = 2*tensorflow_inputs, 2*tensorflow_targets sess = tf.InteractiveSession() tf.train.start_queue_runners() # Internally, `tf.train.batch` uses a QueueRunner, so we need to ask tf to start it. for _ in range(10): numpy_inputs, numpy_targets = sess.run([tensorflow_inputs, tensorflow_targets]) assert numpy_inputs.shape==(batch_size, *input_shape) and numpy_targets.shape==(batch_size, *target_shape) print('r', end='') # Prints `fffffrrffrfrffrffrffrffrffrffrf`. 

En caso de que get_numpy_tensor() devuelva un lote de tensores, entonces tf.train.batch(..., enqueue_many=True) ayudará.