¿Cómo ejecuto os.walk en paralelo en Python?

Escribí una aplicación simple en Java que toma una lista de rutas y genera un archivo con todas las rutas de archivo bajo esa lista original.

Si tengo paths.txt que tenga:

c:\folder1\ c:\folder2\ ... ... c:\folder1000\ 

Mi aplicación ejecuta la función recursiva en cada ruta de múltiples subprocesos y devuelve un archivo con todas las rutas de archivo en estas carpetas.

Ahora quiero escribir esta aplicación en Python.

He escrito una aplicación simple que utiliza os.walk() para ejecutar una carpeta determinada e imprimir las rutas de archivo para generar.

Ahora quiero ejecutarlo en paralelo, y he visto que Python tiene algunos módulos para esto: multiproceso y multiprocesamiento.

¿Qué es lo mejor que hacer esto? Y de esa manera, ¿cómo se realiza?

Aquí hay una solución de multiprocesamiento :

 from multiprocessing.pool import Pool from multiprocessing import JoinableQueue as Queue import os def explore_path(path): directories = [] nondirectories = [] for filename in os.listdir(path): fullname = os.path.join(path, filename) if os.path.isdir(fullname): directories.append(fullname) else: nondirectories.append(filename) outputfile = path.replace(os.sep, '_') + '.txt' with open(outputfile, 'w') as f: for filename in nondirectories: print >> f, filename return directories def parallel_worker(): while True: path = unsearched.get() dirs = explore_path(path) for newdir in dirs: unsearched.put(newdir) unsearched.task_done() # acquire the list of paths with open('paths.txt') as f: paths = f.split() unsearched = Queue() for path in paths: unsearched.put(path) pool = Pool(5) for i in range(5): pool.apply_async(parallel_worker) unsearched.join() print 'Done' 

Este es un patrón para hilos en python que me ha sido útil. No estoy seguro de si los subprocesos boostán tu rendimiento debido a la forma en que funcionan los subprocesos en CPython.

 import threading import Queue import os class PathThread (threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def printfiles(self, p): for path, dirs, files in os.walk(p): for f in files: print path + "/" + f def run(self): while True: path = self.queue.get() self.printfiles(path) self.queue.task_done() # threadsafe queue pathqueue = Queue.Queue() paths = ["foo", "bar", "baz"] # spawn threads for i in range(0, 5): t = PathThread(pathqueue) t.setDaemon(True) t.start() # add paths to queue for path in paths: pathqueue.put(path) # wait for queue to get empty pathqueue.join() 

Incluso el subprocesamiento puede ser bastante útil para el recorrido de directorios. Uso el siguiente código para atravesar un árbol de SharePoint, obteniendo una aceleración bastante significativa en aproximadamente 50 subprocesos. Este progtwig en particular devuelve (ruta, datos) pares para todos los archivos xml en una estructura de directorios, y puede expandirse simplemente para su uso. (Esto se corta y pega de mi progtwig; se necesita alguna edición adicional).

 #unique string for error passing error messages ERROR = '\xffERROR\xff' class ScanWorker(threading.Thread): """Worker class for scanning directory structures. pathQueue: queue for pathnames of directories resultQueue: results of processFile, pairs of (path, data) to be updated """ lock = threading.Lock() dirCount = 0 def __init__(self, pathQueue, resultQueue): self.pathQueue = pathQueue self.resultQueue = resultQueue super().__init__() def run(self): """Worker thread. Get a directory, process it, and put new directories on the queue.""" try: while True: self.processDir(self.pathQueue.get()) self.pathQueue.task_done() except Exception as e: #pass on exception to main thread description = traceback.format_exception(*sys.exc_info()) description.insert(0, "Error in thread {}:\n".format( threading.current_thread().name)) self.resultQueue.put((ERROR, description)) self.pathQueue.task_done() def processDir(self, top): """Visit a directory Call self.processFile on every file, and queue the directories. """ #Wait and retry a few times in case of network errors. #SharePoint is not reliable, gives errors for no reason for retryCount in range(30): try: names = listdir(top) break except OSError as e: if e.errno in (2,22): lastError = e print(end="L", flush=True) time.sleep(1) else: raise else: print("List: too many retries") raise lastError #it is not important to worry about race conditions here self.__class__.dirCount += 1 #process contents for name in names: if isdir(join(top, name)): self.pathQueue.put(join(top, name)) else: self.processFile(join(top, name)) def processFile(self, path): """Get XML file. """ #only xml files if not path.lower().endswith('.xml'): return filemtime = datetime.fromtimestamp(getmtime(path)) #SharePoint is not reliable, gives errors for no reason; just retry for retryCount in range(30): try: data = open(path,'rb').read() break except OSError as e: if e.errno in (2,22): lastError = e print(end="R", flush=True) time.sleep(1) else: raise else: print("Read: too many retries") raise lastError self.resultQueue.put((path, data)) class Scanner: """Interface to the ScanWorkers Sharepoint is pretty fast compared to its delay and handles 50 workers well Make sure you only create one instance of Scanner! """ def __init__(self, workers): #don't restrict the path queue length; this causes deadlock #we use a LIFO queue to get more depth-first like search #reducing average queue length and hopefully improving server caching self.pathQueue = queue.LifoQueue() #this is the output queue to the main thread self.resultQueue = queue.Queue(5) self.workers = workers #start workers for i in range(workers): t = ScanWorker(self.pathQueue, self.resultQueue) t.setDaemon(True) t.start() def startWorkers(self, path): #add counter self.added = 0 #and go self.pathQueue.put(path) def processResult(self, wait=True): """Get an element from the result queue, and add to the zip file.""" path, data = self.resultQueue.get(block=wait) if path==ERROR: #process gave alarm; stop scanning #pass on description raise ScanError(data)  self.resultQueue.task_done() self.added += 1 #main try: #set up scanner = Scanner(threads) scanner.startWorkers(rootpath) pathQueue, resultQueue = scanner.pathQueue, scanner.resultQueue #scanner is rolling; wait for it to finish with pathQueue.all_tasks_done: while pathQueue.unfinished_tasks: #tasks are still running #process results while True: try: scanner.processResult(wait=False) except queue.Empty: break #no new files found; check if scanner is ready done = pathQueue.all_tasks_done.wait(timeout=1) if not done: #Not yet; print something while we wait print( "\rProcessed {} files from {} directories [{} {}] " .format( scanner.added, ScanWorker.dirCount, pathQueue.unfinished_tasks, resultQueue.unfinished_tasks, ), end='\r') #just to make sure everybody is ready: join the path queue pathQueue.join() #process remaining of result queue while resultQueue.unfinished_tasks: scanner.processResult(wait=True) #go to new line to prevent overwriting progress messages print() except ScanError as e: print() print(*e.args[0], end='') print("Process interrupted.") except KeyboardInterrupt: print("\nProcess interrupted.") print()