Python: uso de subprocesos o una cola para iterar en un bucle for que llama a una función

Soy bastante nuevo en Python y estoy creando un script que permite llevar datos de nube de puntos de otros progtwigs a Autodesk Maya. Mi script funciona bien, pero lo que estoy tratando de hacer es hacerlo más rápido. Tengo un bucle for que recorre una lista de archivos numerados. Es decir, datafile001.txt, datafile002.txt y así sucesivamente. ¿Lo que me pregunto es si hay una manera de hacerlo para hacer más de uno a la vez, posiblemente usando hilos o una cola? Abajo tengo el código en el que he estado trabajando:

def threadedFuntion(args): if len(sourceFiles) > 3: for count, item in enumerate(sourceFiles): t1=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber1], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType)) t1.start() t2=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber2], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType)) t2.start() t3=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber3], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType)) t3.start() t4=Thread(target=convertPcToPdc,args=(sourceFiles[filenumber4], particlesName, startframe, endframe, pdcIncrements, outputDirectory, variableFolder, acceptableArrayforms, dataType)) t4.start() 

Obviamente, esto no funciona por varias razones, primero solo creará 4 subprocesos, me gustaría poder dar una opción por más o menos. Segundo, ¿errores porque está tratando de reutilizar un hilo? Como dije, soy bastante nuevo en Python y estoy un poco desconcertado, he estado leyendo varias publicaciones aquí pero no logro que una funcione correctamente. Creo que una cola puede ser algo que necesito, pero no pude resolverlo, experimenté con la statement de condición y con la statement de unión, pero una vez más no pude obtener lo que quiero.

Supongo que para ser más específico, lo que quiero lograr es que la función está leyendo un archivo de texto, recuperando coords y luego exportándolos como un archivo binario para que maya lea. Es común que uno de estos archivos de texto tenga 5-10 millones de coordenadas x, y, z, lo que lleva bastante tiempo. El administrador de tareas dice que python solo usa un 12% de procesador y alrededor de un 1% de ram, por lo que si pudiera hacer varios de estos a la vez, haría que esos 100 o Más archivos pasan mucho más rápido. No creo que sea difícil hacer multiproceso / poner en cola un bucle for, pero he estado perdido y probando soluciones fallidas durante aproximadamente una semana.

Gracias a todos por cualquier ayuda, realmente lo aprecio y creo que este sitio web es increíble. Esta es mi primera publicación, pero siento que he aprendido completamente sobre Python solo por leer aquí.

Subprocesamiento de subprocesos. Suba y coloque su función de trabajo en esa clase como parte de run ().

 import threading import time import random class Worker(threading.Thread): def __init__(self, srcfile, printlock,**kwargs): super(Worker,self).__init__(**kwargs) self.srcfile = srcfile self.lock = printlock # so threads don't step on each other's prints def run(self): with self.lock: print("starting %s on %s" % (self.ident,self.srcfile)) # do whatever you need to, return when done # example, sleep for a random interval up to 10 seconds time.sleep(random.random()*10) with self.lock: print("%s done" % self.ident) def threadme(srcfiles): printlock = threading.Lock() threadpool = [] for file in srcfiles: threadpool.append(Worker(file,printlock)) for thr in threadpool: thr.start() # this loop will block until all threads are done # (however it won't necessarily first join those that are done first) for thr in threadpool: thr.join() print("all threads are done") if __name__ == "__main__": threadme(["abc","def","ghi"]) 

Según lo solicitado, para limitar el número de subprocesos, utilice lo siguiente:

 def threadme(infiles,threadlimit=None,timeout=0.01): assert threadlimit is None or threadlimit > 0, \ "need at least one thread"; printlock = threading.Lock() srcfiles = list(infiles) threadpool = [] # keep going while work to do or being done while srcfiles or threadpool: # while there's room, remove source files # and add to the pool while srcfiles and \ (threadlimit is None \ or len(threadpool) < threadlimit): file = srcfiles.pop() wrkr = Worker(file,printlock) wrkr.start() threadpool.append(wrkr) # remove completed threads from the pool for thr in threadpool: thr.join(timeout=timeout) if not thr.is_alive(): threadpool.remove(thr) print("all threads are done") if __name__ == "__main__": for lim in (1,2,3,4): print("--- Running with thread limit %i ---" % lim) threadme(("abc","def","ghi"),threadlimit=lim) 

Tenga en cuenta que esto realmente procesará las fonts a la inversa (debido a la lista pop ()). Si necesita que se hagan en orden, invierta la lista en algún lugar, o use un deque y popleft ().

Recomendaría usar mrjob para esto.

El señor Job es una implementación en python del mapa reducido .

A continuación se muestra el código de trabajo de mr para hacer un recuento de palabras de múltiples subprocesos en muchos archivos de texto:

 from mrjob.job import MRJob class MRWordCounter(MRJob): def get_words(self, key, line): for word in line.split(): yield word, 1 def sum_words(self, word, occurrences): yield word, sum(occurrences) def steps(self): return [self.mr(self.get_words, self.sum_words),] if __name__ == '__main__': MRWordCounter.run() 

Este código mapea todos los archivos en paralelo (cuenta las palabras para cada archivo), luego reduce los diferentes conteos en un solo conteo total de palabras.