Ejecutar siempre un número constante de subprocesos en paralelo.

Quiero usar subprocesos para permitir que 20 instancias de un script escrito se ejecuten en paralelo. Digamos que tengo una gran lista de urls con como 100.000 entradas y mi progtwig debería controlar que en todo momento 20 instancias de mi script estén trabajando en esa lista. Quería codificarlo de la siguiente manera:

urllist = [url1, url2, url3, .. , url100000] i=0 while number_of_subproccesses < 20 and i<100000: subprocess.Popen(['python', 'script.py', urllist[i]] i = i+1 

Mi script solo escribe algo en una base de datos o archivo de texto. No genera nada y no necesita más entrada que la URL.

Mi problema es que no pude encontrar algo sobre cómo obtener la cantidad de subprocesos que están activos. Soy un progtwigdor novato, así que cualquier sugerencia y sugerencia es bienvenida. También me preguntaba cómo puedo administrarlo una vez que se hayan cargado los 20 subprocesos para que el bucle while verifique las condiciones nuevamente. Pensé en tal vez poner otro bucle de tiempo sobre él, algo así como

 while i<100000 while number_of_subproccesses < 20: subprocess.Popen(['python', 'script.py', urllist[i]] i = i+1 if number_of_subprocesses == 20: sleep() # wait to some time until check again 

¿O tal vez existe la posibilidad de que el bucle while compruebe siempre el número de subprocesos?

También consideré usar el módulo de multiprocesamiento, pero me pareció muy conveniente simplemente llamar a script.py con subprocesamiento en lugar de una función con multiprocesamiento.

Tal vez alguien pueda ayudarme y guiarme en la dirección correcta. ¡Muchas gracias!

Tomando un enfoque diferente del anterior, ya que parece que la callback no se puede enviar como un parámetro:

 NextURLNo = 0 MaxProcesses = 20 MaxUrls = 100000 # Note this would be better to be len(urllist) Processes = [] def StartNew(): """ Start a new subprocess if there is work to do """ global NextURLNo global Processes if NextURLNo < MaxUrls: proc = subprocess.Popen(['python', 'script.py', urllist[NextURLNo], OnExit]) print ("Started to Process %s", urllist[NextURLNo]) NextURLNo += 1 Processes.append(proc) def CheckRunning(): """ Check any running processes and start new ones if there are spare slots.""" global Processes global NextURLNo for p in range(len(Processes):0:-1): # Check the processes in reverse order if Processes[p].poll() is not None: # If the process hasn't finished will return None del Processes[p] # Remove from list - this is why we needed reverse order while (len(Processes) < MaxProcesses) and (NextURLNo < MaxUrls): # More to do and some spare slots StartNew() if __name__ == "__main__": CheckRunning() # This will start the max processes running while (len(Processes) > 0): # Some thing still going on. time.sleep(0.1) # You may wish to change the time for this CheckRunning() print ("Done!") 

Simplemente continúe contando a medida que los inicia y use una callback de cada subproceso para iniciar uno nuevo si hay entradas de la lista de url para procesar.

por ejemplo, asumiendo que su subproceso llama al método OnExit que se le pasa cuando finaliza:

 NextURLNo = 0 MaxProcesses = 20 NoSubProcess = 0 MaxUrls = 100000 def StartNew(): """ Start a new subprocess if there is work to do """ global NextURLNo global NoSubProcess if NextURLNo < MaxUrls: subprocess.Popen(['python', 'script.py', urllist[NextURLNo], OnExit]) print "Started to Process", urllist[NextURLNo] NextURLNo += 1 NoSubProcess += 1 def OnExit(): NoSubProcess -= 1 if __name__ == "__main__": for n in range(MaxProcesses): StartNew() while (NoSubProcess > 0): time.sleep(1) if (NextURLNo < MaxUrls): for n in range(NoSubProcess,MaxProcesses): StartNew() 

Para mantener un número constante de solicitudes simultáneas, puede utilizar un grupo de subprocesos:

 #!/usr/bin/env python from multiprocessing.dummy import Pool def process_url(url): # ... handle a single url urllist = [url1, url2, url3, .. , url100000] for _ in Pool(20).imap_unordered(process_url, urllist): pass 

Para ejecutar procesos en lugar de subprocesos, elimine .dummy de la importación.