Paralelice tf.from_generator usando tf.contrib.data.parallel_interleave

Tengo un montón de archivos de matriz JSON (AVRO para ser precisos) y cada uno de ellos produce varias muestras para entrenar un modelo Keras. Usando ideas de @GPhilo y de @jsimsa , pude encontrar esto para paralelizar mi canal de entrada. No se puede averiguar cómo diseñar el generator(n) para dividir el trabajo de procesamiento de archivos. El código falla dentro de parse_file(f) ya que la función espera una ruta de archivo de cadena y no un Tensor ,

 N = num_cores = 2 files_to_process = ["f1.avro", "f2.avro", "f3.avro"] shuffle_size = prefetch_buffer = 1000 batch_size = 512 def generator(n): size = math.ceil(len(files_to_process) / N) start_index = n * size end_index = start_index + size def gen(): # for f in files_to_process[start_index:end_index]: for f in tf.slice(files_to_process, start_index, size): yield f return gen def dataset(n): return tf.data.Dataset.from_generator(generator(n), (tf.string,)) def process_file(f): examples_x, examples_y = parse_file(f) return examples_x, examples_y ds = tf.data.Dataset.range(N) ds = ds.apply(tf.contrib.data.parallel_interleave(dataset, cycle_length=N)) ds = ds.map(process_file, num_parallel_calls=N) ds = ds.prefetch(prefetch_buffer) ds = ds.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x)) ds = ds.batch(batch_size).shuffle(shuffle_size) ... myTfKerasModel.fit( ds.make_one_iterator(), NUM_TRAIN_SAMPLES // batch_size ) 
  • ¿Cuál es la forma correcta de diseñar el generator(n) aquí?
  • ¿Es esta una forma optimizada de diseñar mi flat_map entrada usando parallel_interleave y flat_map

Me parece que estás complicando tu vida innecesariamente con el generador. Así es como implementaría su canal de entrada:

 def parse_file_tf(filename): return tf.py_func(parse_file, [filename], [tf.float32, tf.float32]) # version with map files = tf.data.Dataset.from_tensor_slices(files_to_process) dataset = files.map(parse_file_tf, num_parallel_calls=N) dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x)) dataset = dataset.batch(batch_size).shuffle(shuffle_size).prefetch(2) it = dataset.make_one_shot_iterator() 

Para probarlo, defino un parse_file simulado como:

 i=0 def parse_file(f): global i i += 1 return np.asarray([i]*i, dtype=np.float32), np.asarray([i]*i, dtype=np.float32) # mimicks variable-length examples_x, examples_y 

que alimente en un bucle básico que muestra lo que el iterador devuelve:

 sess = tf.Session() try: while True: x, y = it.get_next() vx, vy = sess.run([x,y]) print(vx) print(vy) except tf.errors.OutOfRangeError: pass sess.close() 

Ejecutando el código de arriba se imprime:

 [2. 3. 2. 1. 3. 3.] [2. 3. 2. 1. 3. 3.] 

Explicación de la tubería

Esencialmente, dejo el problema de la paralelización para map , donde puedo pasar el número de subprocesos que debe ejecutar. No hay necesidad de iteradores de generadores sobre rangos y esas complicaciones adicionales.

Elegí el mapa en parallel_interleave porque este último requiere que genere una instancia de Dataset para cada elemento que devuelve, lo que en su caso no tiene sentido porque ya ha cargado todos los valores en la memoria cuando ejecuta parse_file . parallel_interleave tiene sentido si genera los valores lentamente (por ejemplo, aplicando tf.data.TFRecordDataset a una lista de nombres de archivos), pero si su conjunto de datos encaja en la memoria, vaya al map .

Sobre las limitaciones de tf.py_func , no afectan su red capacitada, solo el canal de entrada. Idealmente, tendrá un canal diferente para su capacitación y para su uso final de la red. Solo debe ocuparse de las limitaciones durante este último, mientras que para la capacitación (a menos que haga algo muy específico con la capacitación distribuida y / o la transferencia de la capacitación a través de las máquinas) es razonablemente seguro.


Versión con generador

Si sus archivos JSON son muy grandes y su contenido no cabe en la memoria, puede usar un generador, pero un poco diferente del enfoque con el que comenzó. La idea es que el generador pase por el archivo JSON y yield un registro a la vez. Entonces, el generador tiene que ser su función parse_file . Como ejemplo, supongamos que tiene el siguiente generador de parse_file :

 i = 3 def parse_file(filename): global i i += 1 ctr = 0 while ctr < i: yield ctr, ctr 

En este caso, la tubería se vería de la siguiente manera:

 def wrap_generator(filename): return tf.data.Dataset.from_generator(parse_file(filename), [tf.int32, tf.int32]) files = tf.data.Dataset.from_tensor_slices(files_to_process) dataset = files.apply(tf.contrib.data.parallel_interleave(wrap_generator, cycle_length=N)) dataset = dataset.flat_map(lambda *x: tf.data.Dataset.from_tensor_slices(x)) dataset = dataset.shuffle(shuffle_size).batch(batch_size).prefetch(2) it = dataset.make_one_shot_iterator() 

Tenga en cuenta que aquí necesitamos usar la función de parallel_interleave porque convertimos los generadores en instancias de Dataset de las que extraemos valores. El rest sigue igual.

Alimentar esto al mismo bucle de muestra que las impresiones anteriores:

 [6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.] [6. 5. 4. 4. 6. 5. 6. 6. 5. 4. 6. 4. 5. 5. 6.]