Cómo manejar los datos entrantes en tiempo real con los pandas de Python

¿Cuál es la forma más recomendada / pythonic de manejar los datos entrantes en vivo con pandas?

Cada pocos segundos recibo un punto de datos en el siguiente formato:

{'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0} 

Me gustaría agregarlo a un DataFrame existente y luego realizar un análisis en él.

El problema es que solo agregar filas con DataFrame.append puede llevar a problemas de rendimiento con todas esas copias.

Cosas que he probado:

Algunas personas sugirieron preasignar un gran DataFrame y actualizarlo a medida que ingresan los datos:

 In [1]: index = pd.DatetimeIndex(start='2013-01-01 00:00:00', freq='S', periods=5) In [2]: columns = ['high', 'low', 'open', 'close'] In [3]: df = pd.DataFrame(index=t, columns=columns) In [4]: df Out[4]: high low open close 2013-01-01 00:00:00 NaN NaN NaN NaN 2013-01-01 00:00:01 NaN NaN NaN NaN 2013-01-01 00:00:02 NaN NaN NaN NaN 2013-01-01 00:00:03 NaN NaN NaN NaN 2013-01-01 00:00:04 NaN NaN NaN NaN In [5]: data = {'time' :'2013-01-01 00:00:02', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0} In [6]: data_ = pd.Series(data) In [7]: df.loc[data['time']] = data_ In [8]: df Out[8]: high low open close 2013-01-01 00:00:00 NaN NaN NaN NaN 2013-01-01 00:00:01 NaN NaN NaN NaN 2013-01-01 00:00:02 4 3 2 1 2013-01-01 00:00:03 NaN NaN NaN NaN 2013-01-01 00:00:04 NaN NaN NaN NaN 

La otra alternativa es construir una lista de dictados. Simplemente agregue los datos entrantes a una lista y divídalos en DataFrames más pequeños para hacer el trabajo.

 In [9]: ls = [] In [10]: for n in range(5): .....: # Naive stuff ahead =) .....: time = '2013-01-01 00:00:0' + str(n) .....: d = {'time' : time, 'stock' : 'BLAH', 'high' : np.random.rand()*10, 'low' : np.random.rand()*10, 'open' : np.random.rand()*10, 'close' : np.random.rand()*10} .....: ls.append(d) In [11]: df = pd.DataFrame(ls[1:3]).set_index('time') In [12]: df Out[12]: close high low open stock time 2013-01-01 00:00:01 3.270078 1.008289 7.486118 2.180683 BLAH 2013-01-01 00:00:02 3.883586 2.215645 0.051799 2.310823 BLAH 

o algo así, tal vez procesando la entrada un poco más.

Yo usaría HDF5 / pytables de la siguiente manera:

  1. Mantenga los datos como una lista de python “el mayor tiempo posible”.
  2. Agregue sus resultados a esa lista.
  3. Cuando se pone “grande”:
    • push to HDF5 Store utilizando pandas io (y una tabla anexable).
    • Borrar la lista.
  4. Repetir.

De hecho, la función que defino utiliza una lista para cada “clave” para que pueda almacenar múltiples DataFrames en la tienda HDF5 en el mismo proceso.


Definimos una función a la que llamas con cada fila d :

 CACHE = {} STORE = 'store.h5' # Note: another option is to keep the actual file open def process_row(d, key, max_len=5000, _cache=CACHE): """ Append row d to the store 'key'. When the number of items in the key's cache reaches max_len, append the list of rows to the HDF5 store and clear the list. """ # keep the rows for each key separate. lst = _cache.setdefault(key, []) if len(lst) >= max_len: store_and_clear(lst, key) lst.append(d) def store_and_clear(lst, key): """ Convert key's cache list to a DataFrame and append that to HDF5. """ df = pd.DataFrame(lst) with pd.HDFStore(STORE) as store: store.append(key, df) lst.clear() 

Nota: usamos la instrucción with para cerrar automáticamente la tienda después de cada escritura. Es posible que sea ​​más rápido mantenerlo abierto, pero si es así, se recomienda que se lave con regularidad (cierre de las ollas) . También tenga en cuenta que puede ser más legible usar un deque de colecciones en lugar de una lista, pero el rendimiento de una lista será ligeramente mejor aquí.

Para usar esto llamas como:

 process_row({'time' :'2013-01-01 00:00:00', 'stock' : 'BLAH', 'high' : 4.0, 'low' : 3.0, 'open' : 2.0, 'close' : 1.0}, key="df") 

Nota: “df” es la clave almacenada utilizada en la tienda de pytables.

Una vez que el trabajo haya terminado, asegúrate de store_and_clear y store_and_clear el caché restante:

 for k, lst in CACHE.items(): # you can instead use .iteritems() in python 2 store_and_clear(lst, k) 

Ahora su completo DataFrame está disponible a través de:

 with pd.HDFStore(STORE) as store: df = store["df"] # other keys will be store[key] 

Algunos comentarios:

  • 5000 se pueden ajustar, intente con algunos números más pequeños / más grandes para satisfacer sus necesidades.
  • El apéndice de la lista es O (1) , el apéndice de DataFrame es O ( len(df) ).
  • Hasta que no estés haciendo estadísticas o sin recostackr datos, no necesitas pandas, usa lo más rápido.
  • Este código funciona con varias claves (puntos de datos) que entran.
  • Este es un código muy pequeño, y nos quedamos en la lista de python de vainilla y luego en el dataframe de pandas …

Además, para obtener las lecturas actualizadas, puede definir un método de obtención que almacene y borre antes de leer. De esta forma obtendría los datos más actualizados:

 def get_latest(key, _cache=CACHE): store_and_clear(_cache[key], key) with pd.HDFStore(STORE) as store: return store[key] 

Ahora cuando accedas con:

 df = get_latest("df") 

obtendrás el último “df” disponible.


Otra opción es un poco más complicada: definir una tabla personalizada en vainilla pytables, consulte el tutorial .

Nota: Debe conocer los nombres de campo para crear el descriptor de columna .

En realidad, está intentando resolver dos problemas: capturar datos en tiempo real y analizarlos. El primer problema se puede resolver con el registro de Python , que está diseñado para este propósito. Luego, el otro problema se puede resolver leyendo el mismo archivo de registro.