Python Pandas Multiprocessing Apply

Me pregunto si hay una manera de hacer una función de aplicación de dataframe de pandas en paralelo. He mirado a mi alrededor y no he encontrado nada. Al menos en teoría, creo que debería ser bastante simple de implementar pero no he visto nada. Después de todo, esta es prácticamente la definición de libro de texto. ¿Alguien más ha intentado esto o sabe de alguna manera? Si nadie tiene ideas, creo que podría intentar escribirlas yo mismo.

El código con el que estoy trabajando está abajo. Lo siento por la falta de declaraciones de importación. Se mezclan con muchas otras cosas.

def apply_extract_entities(row): names=[] counter=0 print row for sent in nltk.sent_tokenize(open(row['file_name'], "r+b").read()): for chunk in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(sent))): if hasattr(chunk, 'node'): names+= [chunk.node, ' '.join(c[0] for c in chunk.leaves())] counter+=1 print counter return names data9_2['proper_nouns']=data9_2.apply(apply_extract_entities, axis=1) 

EDITAR:

Así que aquí es lo que he intentado. Intenté ejecutarlo solo con los primeros cinco elementos de mi iterable y está tomando más tiempo de lo que haría si lo ejecutara en serie, así que asumo que no está funcionando.

 os.chdir(str(home)) data9_2=pd.read_csv('edgarsdc3.csv') os.chdir(str(home)+str('//defmtest')) #import stuff from nltk import pos_tag, ne_chunk from nltk.tokenize import SpaceTokenizer #define apply function and apply it os.chdir(str(home)+str('//defmtest')) #### #this is our apply function def apply_extract_entities(row): names=[] counter=0 print row for sent in nltk.sent_tokenize(open(row['file_name'], "r+b").read()): for chunk in nltk.ne_chunk(nltk.pos_tag(nltk.word_tokenize(sent))): if hasattr(chunk, 'node'): names+= [chunk.node, ' '.join(c[0] for c in chunk.leaves())] counter+=1 print counter return names #need something that populates a list of sections of a dataframe def dataframe_splitter(df): df_list=range(len(df)) for i in xrange(len(df)): sliced=df.ix[i] df_list[i]=sliced return df_list df_list=dataframe_splitter(data9_2) #df_list=range(len(data9_2)) print df_list #the multiprocessing section import multiprocessing def worker(arg): print arg (arg)['proper_nouns']=arg.apply(apply_extract_entities, axis=1) return arg pool = multiprocessing.Pool(processes=10) # get list of pieces res = pool.imap_unordered(worker, df_list[:5]) res2= list(itertools.chain(*res)) pool.close() pool.join() # re-assemble pieces into the final output output = data9_2.head(1).concatenate(res) print output.head() 

Con el multiprocesamiento, es mejor generar varios bloques de datos grandes y luego volver a ensamblarlos para producir la salida final.

fuente

 import multiprocessing def worker(arg): return arg*2 pool = multiprocessing.Pool() # get list of pieces res = pool.map(worker, [1,2,3]) pool.close() pool.join() # re-assemble pieces into the final output output = sum(res) print 'got:',output 

salida

 got: 12