iteración sobre una pandas df en paralelo

Por lo tanto, quiero iterar sobre un pandas df en paralelo, así que supongamos que tengo 15 filas, entonces quiero iterar sobre él en paralelo y no una por una.

df: –

df = pd.DataFrame.from_records([ {'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }, {'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }, {'domain':'blhp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }, {'domain':'rbswp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }, {'domain':'foxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }, {'domain':'rbsxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }, {'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }, {'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' } ]) 

introduzca la descripción de la imagen aquí

Por lo tanto, estoy iterando sobre el df y haciendo la línea de comandos y luego almacenando la salida en un df y haciendo el filtrado de datos y luego finalmente almacenando en influxdb. El problema es que lo estoy haciendo uno por uno mientras lo estoy repitiendo. Lo que quiero iterar sobre todas las filas en paralelo.

A partir de ahora he realizado 20 scripts y uso de multiprocesamiento para repasar todos los scripts en paralelo. Es un dolor cuando tengo que hacer un cambio, ya que tengo que hacerlo en los 20 scripts. Mi script se ve a continuación:

 for index, row in dff.iterrows(): domain = row['domain'] duration = str(row['duration']) media_file = row['media_file'] user = row['user'] channel = row['channel'] cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' + duration + ' -f ' + media_file + ' -u .' + user + '. -c sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- test' rows = [shlex.split(line) for line in os.popen( cmda).read().splitlines() if line.strip()] df = pd.DataFrame(rows) """ Bunch of data filteration and pushing it into influx """ 

A partir de ahora tengo 15 scripts si tengo 15 filas en df y hago un parallel processing como el siguiente:

 import os import time from multiprocessing import Process os.chdir('/Users/akumar/vivox-sdk-4.9.0002.30719.ebb523a9') def run_program(cmd): # Function that processes will run os.system(cmd) # Creating command to run commands = ['python testv.py'] commands.extend(['python testv{}.py'.format(i) for i in range(1, 15)]) # Amount of times your programs will run runs = 1 for run in range(runs): # Initiating Processes with desired arguments running_programs = [] for command in commands: running_programs.append(Process(target=run_program, args=(command,))) running_programs[-1].daemon = True # Start our processes simultaneously for program in running_programs: program.start() # Wait untill all programs are done while any(program.is_alive() for program in running_programs): time.sleep(1) 

Pregunta: – ¿Cómo puedo iterar sobre el df y hacer que las 15 filas se ejecuten en paralelo y hacer todas las cosas dentro del bucle for?

Voy a copiar y pegar mi respuesta de Reddit aquí (en caso de que alguien se encuentre con una situación similar):

 import dask.dataframe as ddf def your_function(row): domain = row['domain'] duration = str(row['duration']) media_file = row['media_file'] user = row['user'] channel = row['channel'] cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' + duration + ' -f ' + media_file + ' -u .' + user + '. -c sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- test' rows = [shlex.split(line) for line in os.popen( cmda).read().splitlines() if line.strip()] df_dask = ddf.from_pandas(df, npartitions=4) # where the number of partitions is the number of cores you want to use df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing') 

Es posible que tenga que jugar con el parámetro de eje en el método de apply .

En lugar de iniciar 15 procesos, use subprocesos y llame a la función de subprocesos con un argumento. threading.Thread(target=func, args=(i,)) donde i es tu número y func es una función que envuelve todo el código. Luego itera a través de él. No es necesario paralelizar la iteración en 15 elementos.