Tensor Flow shuffle_batch () bloquea al final de la época

Estoy usando tf.train.shuffle_batch () para crear lotes de imágenes de entrada. Incluye un parámetro min_after_dequeue que se asegura de que haya un número específico de elementos dentro de la cola interna, y bloquea todo lo demás si no lo hay.

images, label_batch = tf.train.shuffle_batch( [image, label], batch_size=FLAGS.batch_size, num_threads=num_preprocess_threads, capacity=FLAGS.min_queue_size + 3 * FLAGS.batch_size, min_after_dequeue=FLAGS.min_queue_size) 

Al final de una época, cuando estoy haciendo una evaluación (estoy seguro de que esto también es cierto en el entrenamiento pero no lo he probado), todo se bloquea. Me di cuenta de que en el mismo momento la cola interna de lotes de shuffle se quedaría con menos de min_after_dequeue elementos. En este momento en el progtwig, idealmente, me gustaría quitar los elementos restantes, pero no estoy seguro de cómo hacerlo.

Al parecer, este tipo de locking dentro de las colas de TF se puede desactivar cuando se sabe que no hay más elementos para poner en cola con el método .close (). Sin embargo, dado que la cola subyacente está oculta dentro de la función, ¿cómo llamo a ese método?

Aquí está el código con el que finalmente pude trabajar, aunque con una serie de advertencias de que los elementos que puse en cola se cancelaron.

 lv = tf.constant(label_list) label_fifo = tf.FIFOQueue(len(filenames),tf.int32,shapes=[[]]) # if eval_data: # num_epochs = 1 # else: # num_epochs = None file_fifo = tf.train.string_input_producer(filenames, shuffle=False, capacity=len(filenames)) label_enqueue = label_fifo.enqueue_many([lv]) reader = tf.WholeFileReader() result.key, value = reader.read(file_fifo) image = tf.image.decode_jpeg(value, channels=3) image.set_shape([128,128,3]) result.uint8image = image result.label = label_fifo.dequeue() images, label_batch = tf.train.shuffle_batch( [result.uint8image, result.label], batch_size=FLAGS.batch_size, num_threads=num_preprocess_threads, capacity=FLAGS.min_queue_size + 3 * FLAGS.batch_size, min_after_dequeue=FLAGS.min_queue_size) #in eval file: label_enqueue, images, labels = load_input.inputs() #restre from checkpoint in between coord = tf.train.Coordinator() try: threads = [] for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS): threads.extend(qr.create_threads(sess, coord=coord, daemon=True, start=True)) num_iter = int(math.ceil(FLAGS.num_examples / FLAGS.batch_size)) true_count = 0 # Counts the number of correct predictions. total_sample_count = num_iter * FLAGS.batch_size sess.run(label_enqueue) step = 0 while step < num_iter and not coord.should_stop(): end_epoch = False if step > 0: for qr in tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS): #check if not enough elements in queue size = qr._queue.size().eval() if size - FLAGS.batch_size < FLAGS.min_queue_size: end_epoch = True if end_epoch: #enqueue more so that we can finish sess.run(label_enqueue) #actually run step predictions = sess.run([top_k_op]) 

Tiene razón en que la ejecución de la operación RandomShuffleQueue.close() detendrá los subprocesos de la cola de espera para que no se bloqueen cuando haya menos elementos min_after_dequeue en la cola.

La función tf.train.shuffle_batch() crea un tf.train.QueueRunner que realiza operaciones en la cola en un subproceso en segundo plano. Si lo inicia de la siguiente manera, pasando un tf.train.Coordinator , podrá cerrar la cola limpiamente (según el ejemplo aquí ):

 sess = tf.Session() coord = tf.train.Coordinator() tf.train.start_queue_runners(sess, coord=coord) while not coord.should_stop(): sess.run(train_op) # When done, ask the threads to stop. coord.request_stop() # And wait for them to actually do it. coord.join(threads) 

Hay un argumento opcional allow_smaller_final_batch

“allow_smaller_final_batch: (Opcional) Boolean. Si es True, permite que el lote final sea más pequeño si no hay suficientes elementos en la cola”.