¿Qué está pasando en tf.train.shuffle_batch y `tf.train.batch?

Uso datos binarios para entrenar un DNN.

Pero tf.train.shuffle_batch y tf.train.batch me confunden.

Este es mi código y haré algunas pruebas al respecto.

Primero Using_Queues_Lib.py :

 from __future__ import absolute_import from __future__ import division from __future__ import print_function import os from six.moves import xrange # pylint: disable=redefined-builtin import tensorflow as tf NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN = 100 REAL32_BYTES=4 def read_dataset(filename_queue,data_length,label_length): class Record(object): pass result = Record() result_data = data_length*REAL32_BYTES result_label = label_length*REAL32_BYTES record_bytes = result_data + result_label reader = tf.FixedLengthRecordReader(record_bytes=record_bytes) result.key, value = reader.read(filename_queue) record_bytes = tf.decode_raw(value, tf.float32) result.data = tf.strided_slice(record_bytes, [0],[data_length])#record_bytes: tf.float list result.label = tf.strided_slice(record_bytes, [data_length],[data_length+label_length]) return result def _generate_data_and_label_batch(data, label, min_queue_examples,batch_size, shuffle): num_preprocess_threads = 16 #only speed code if shuffle: data_batch, label_batch = tf.train.shuffle_batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size,min_after_dequeue=min_queue_examples) else: data_batch, label_batch = tf.train.batch([data, label],batch_size=batch_size,num_threads=num_preprocess_threads,capacity=min_queue_examples + batch_size) return data_batch, label_batch def inputs(data_dir, batch_size,data_length,label_length): filenames = [os.path.join(data_dir, 'test_data_SE.dat')] for f in filenames: if not tf.gfile.Exists(f): raise ValueError('Failed to find file: ' + f) filename_queue = tf.train.string_input_producer(filenames) read_input = read_dataset(filename_queue,data_length,label_length) read_input.data.set_shape([data_length]) #important read_input.label.set_shape([label_length]) #important min_fraction_of_examples_in_queue = 0.4 min_queue_examples = int(NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN * min_fraction_of_examples_in_queue) print ('Filling queue with %d samples before starting to train. ' 'This will take a few minutes.' % min_queue_examples) return _generate_data_and_label_batch(read_input.data, read_input.label, min_queue_examples, batch_size, shuffle=True) 

Segundo Using_Queues.py :

 import Using_Queues_Lib import tensorflow as tf import numpy as np import time max_steps=10 batch_size=100 data_dir=r'.' data_length=2 label_length=1 #-----------Save paras----------- import struct def WriteArrayFloat(file,data): fout=open(file,'wb') fout.write(struct.pack('<'+str(data.flatten().size)+'f', *data.flatten().tolist())) fout.close() #----------------------------- def add_layer(inputs, in_size, out_size, activation_function=None): Weights = tf.Variable(tf.truncated_normal([in_size, out_size])) biases = tf.Variable(tf.zeros([1, out_size]) + 0.1) Wx_plus_b = tf.matmul(inputs, Weights) + biases if activation_function is None: outputs = Wx_plus_b else: outputs = activation_function(Wx_plus_b) return outputs data_train,labels_train=Using_Queues_Lib.inputs(data_dir=data_dir, batch_size=batch_size,data_length=data_length, label_length=label_length) xs=tf.placeholder(tf.float32,[None,data_length]) ys=tf.placeholder(tf.float32,[None,label_length]) l1 = add_layer(xs, data_length, 5, activation_function=tf.nn.sigmoid) l2 = add_layer(l1, 5, 5, activation_function=tf.nn.sigmoid) prediction = add_layer(l2, 5, label_length, activation_function=None) loss = tf.reduce_mean(tf.square(ys - prediction)) train_step = tf.train.GradientDescentOptimizer(0.2).minimize(loss) sess=tf.InteractiveSession() tf.global_variables_initializer().run() tf.train.start_queue_runners() for i in range(max_steps): start_time=time.time() data_batch,label_batch=sess.run([data_train,labels_train]) sess.run(train_step, feed_dict={xs: data_batch, ys: label_batch}) duration=time.time()-start_time if i % 1 == 0: example_per_sec=batch_size/duration sec_pec_batch=float(duration) WriteArrayFloat(r'./data/'+str(i)+'.bin', np.concatenate((data_batch,label_batch),axis=1)) format_str=('step %d,loss=%.8f(%.1f example/sec;%.3f sec/batch)') loss_value=sess.run(loss, feed_dict={xs: data_batch, ys: label_batch}) print(format_str%(i,loss_value,example_per_sec,sec_pec_batch)) 

Los datos aquí . Y es generado por Mathematica .

 data = Flatten@Table[{x, y, x*y}, {x, -1, 1, .05}, {y, -1, 1, .05}]; BinaryWrite[file, mydata, "Real32", ByteOrdering -> -1]; Close[file]; 

Longitud de datos: 1681

Los datos se ven así:

introduzca la descripción de la imagen aquí

trazar los datos: el color rojo a verde significa el momento en que ocurrieron aquí

introduzca la descripción de la imagen aquí

Ejecute Using_Queues.py , producirá diez lotes, y dibujo cada bach en este gráfico 🙁 batch_size=100 y min_queue_examples=40 ) introduzca la descripción de la imagen aquí

Si batch_size=1024 y min_queue_examples=40 : introduzca la descripción de la imagen aquí

Si batch_size=100 y min_queue_examples=4000 : introduzca la descripción de la imagen aquí

Si batch_size=1024 y min_queue_examples=4000 : introduzca la descripción de la imagen aquí

E incluso si batch_size = 1681 y min_queue_examples=4000 : introduzca la descripción de la imagen aquí

La región no está llena de puntos.

¿Por qué?

Entonces, ¿por qué cambiar los min_queue_examples hacen más aleatorios? ¿Cómo determinar el valor min_queue_examples ?

¿Qué está pasando en tf.train.shuffle_batch ?

La función de muestreo que tf.train.shuffle_batch() (y, por tf.RandomShuffleQueue tanto, tf.RandomShuffleQueue ) es un poco sutil. La implementación utiliza tf.RandomShuffleQueue.dequeue_many(batch_size) , cuya implementación (simplificada) es la siguiente:

  • Mientras que el número de elementos en cola es menor que batch_size :
    • Espere hasta que la cola contenga al menos min_after_dequeue + 1 elementos.
    • Seleccione un elemento de la cola uniformemente al azar, elimínelo de la cola y añádalo al lote de salida.

La otra cosa a tener en cuenta es cómo se agregan los elementos a la cola, que utiliza un hilo de fondo que ejecuta tf.RandomShuffleQueue.enqueue() en la misma cola:

  • Espere hasta que el tamaño actual de la cola sea menor que su capacity .
  • Agregue el elemento a la cola.

Como resultado, la capacity y min_after_dequeue propiedades min_after_dequeue de la cola (más la distribución de los datos de entrada en cola) determinan la población de la que se tf.train.shuffle_batch() . Parece que los datos en tus archivos de entrada están ordenados, por lo que estás confiando completamente en la función tf.train.shuffle_batch() para la aleatoriedad.

Tomando sus visualizaciones a su vez:

  1. Si la capacity y la min_after_dequeue son pequeñas en relación con el conjunto de datos, “shuffling” seleccionará elementos aleatorios de una pequeña población que se asemeja a una “ventana deslizante” a través del conjunto de datos. Con un poco de probabilidad, verá elementos antiguos en el lote de salida en cola.

  2. Si batch_size es grande y min_after_dequeue es pequeño en relación con el conjunto de datos, “shuffling” se seleccionará nuevamente desde una pequeña “ventana deslizante” a través del conjunto de datos.

  3. Si min_after_dequeue es grande en relación con batch_size y el tamaño del conjunto de datos, verá (aproximadamente) muestras uniformes de los datos en cada lote.

  4. Si min_after_dequeue y batch_size son grandes en relación con el tamaño del conjunto de datos, verá (aproximadamente) muestras uniformes de los datos en cada lote.

  5. En el caso de que min_after_dequeue sea ​​4000, y batch_size sea ​​1681, tenga en cuenta que el número esperado de copias de cada elemento en la cola cuando muestrea es 4000 / 1681 = 2.38 , por lo que es más probable que algunos elementos se muestreen más de una vez. (y menos probable que muestre cada elemento único exactamente una vez).

shuffle_batch no es más que una implementación RandomShuffleQueue de asincronismo. Primero deberá comprender qué es el asincronismo. Luego, shuffle_batch debería ser muy sencillo de entender, con un poco de ayuda con los documentos oficiales ( https://www.tensorflow.org/versions/r1.3/programmers_guide/threading_and_queues ). Supongamos que desea diseñar un sistema que pueda leer y escribir datos al mismo tiempo. La mayoría de la gente lo diseñó como tal:

1) crear un hilo para leer datos y un hilo para escribir datos. el hilo de lectura eliminará un elemento de la cola para lectura (dequeue) y el hilo de escritura agregará un elemento a la cola como resultado de la escritura (en cola).

2) use las colas de locking para administrar la sincronización entre los subprocesos de lectura y escritura, porque no desea que el hilo de lectura esté leyendo los mismos datos que el hilo de escritura, y cuando la cola está vacía, el hilo de lectura debe estar colgado ( bloqueado) para esperar a que los datos se escriban (en cola) por el hilo de escritura, y cuando la cola está llena, el hilo de escritura debe esperar a que el hilo de lectura saque los datos de la cola (salida). En la tubería de entrada de tensorflow, las cosas no son diferentes. Básicamente hay dos conjuntos de subprocesos funcionando. Uno es agregar ejemplos de entrenamiento a una cola y el otro es responsable de tomar ejemplos de entrenamiento de la cola para entrenamiento. Así es exactamente cómo se diseñan slice_input_producer, string_input_producer, shuffle_batch.

Te escribí un pequeño progtwig para desglosar las cosas para que entiendas el flujo de entrada de tensorflow, shuffle_batch y el efecto de los parámetros de min_after_dequeue y batch_size:

 import tensorflow as tf import numpy as np test_size = 2000 input_data = tf.range(test_size) xi = [x for x in range(0, test_size, 50)[1:]] yi = [int(test_size * x) for x in np.array(range(1, 100, 5)) / 100.0] zi = np.zeros(shape=(len(yi), len(xi))) with tf.Session() as sess: for idx, batch_size in enumerate(xi): for idy, min_after_dequeue in enumerate(yi): # synchronization example 1: create a fifo queue, one thread is # adding many training examples at a time to the queue, and the other # is taking one example at a time out of the queue. # this is similar to what slice_input_producer does. fifo_q = tf.FIFOQueue(capacity=test_size, dtypes=tf.int32, shapes=[[]]) en_fifo_q = fifo_q.enqueue_many(input_data) single_data = fifo_q.dequeue() # synchronization example 2: create a random shuffle queue, one thread is # adding one training example at a time to the queue, and the other # is taking many examples as a batch at a time out of the queue. # this is similar to what shuffle_batch does. rf_queue = tf.RandomShuffleQueue(capacity=test_size, min_after_dequeue=min_after_dequeue, shapes=single_data._shape, dtypes=single_data._dtype) rf_enqueue = rf_queue.enqueue(single_data) batch_data = rf_queue.dequeue_many(batch_size) # now let's creating threads for enqueue operations(writing thread). # enqueue threads have to be started at first, the tf session will # take care of your training(reading thread) which will be running when you call sess.run. # the tf coordinators are nothing but threads managers that take care of the life cycle # for created threads qr_fifo = tf.train.QueueRunner(fifo_q, [en_fifo_q] * 8) qr_rf = tf.train.QueueRunner(rf_queue, [rf_enqueue] * 4) coord = tf.train.Coordinator() fifo_queue_threads = qr_fifo.create_threads(sess, coord=coord, start=True) rf_queue_threads = qr_rf.create_threads(sess, coord=coord, start=True) shuffle_pool = [] num_steps = int(np.ceil(test_size / float(batch_size))) for i in range(num_steps): shuffle_data = sess.run([batch_data]) shuffle_pool.extend(shuffle_data[0].tolist()) # evaluating unique_rate of each combination of batch_size and min_after_dequeue # unique rate 1.0 indicates each example is shuffled uniformly. # unique rate < 1.0 means that some examples are shuffled twice. unique_rate = len(np.unique(shuffle_pool)) / float(test_size) print min_after_dequeue, batch_size, unique_rate zi[idy, idx] = unique_rate # stop threads. coord.request_stop() coord.join(rf_queue_threads) coord.join(fifo_queue_threads) print xi, yi, zi plt.clf() plt.title('shuffle_batch_example') plt.ylabel('num_dequeue_ratio') plt.xlabel('batch_size') xxi, yyi = np.meshgrid(xi, yi) plt.pcolormesh(xxi, yyi, zi) plt.colorbar() plt.show() 

Si ejecuta el código anterior, debería ver el gráfico: shuffle_batch_example

podemos ver claramente que cuando batch_size aumenta, la unique_rate aumenta, y cuando min_after_dequeue se hace más pequeña, la tasa única aumenta. La tasa única es un indicador que calculo para monitorear cuántas muestras duplicadas se generan al vuelo de shuffle_batch en mini-lotes.

Usa decode_raw para leer datos en bruto.

 float_values = tf.decode_raw(data, tf.float32, little_endian=True)