Una captura de URL paralela de subprocesos muy simple (sin cola)

Pasé todo el día buscando el captador de URL multiproceso más sencillo posible en Python, pero la mayoría de los scripts que encontré utilizan colas o multiprocesamiento o bibliotecas complejas.

Finalmente escribí uno, que informo como respuesta. Por favor, siéntase libre de sugerir cualquier mejora.

Supongo que otras personas podrían haber estado buscando algo similar.

Simplificando su versión original en la medida de lo posible:

import threading import urllib2 import time start = time.time() urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] def fetch_url(url): urlHandler = urllib2.urlopen(url) html = urlHandler.read() print "'%s\' fetched in %ss" % (url, (time.time() - start)) threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls] for thread in threads: thread.start() for thread in threads: thread.join() print "Elapsed Time: %s" % (time.time() - start) 

Los únicos trucos nuevos aquí son:

  • Mantenga un registro de los hilos que creas.
  • No se moleste con un contador de hilos si solo quiere saber cuándo están todos hechos; join ya te dice eso.
  • Si no necesita ningún estado o API externa, no necesita una subclase Thread , solo una función de target .

multiprocessing tiene un grupo de subprocesos que no inicia otros procesos:

 #!/usr/bin/env python from multiprocessing.pool import ThreadPool from time import time as timer from urllib2 import urlopen urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] def fetch_url(url): try: response = urlopen(url) return url, response.read(), None except Exception as e: return url, None, e start = timer() results = ThreadPool(20).imap_unordered(fetch_url, urls) for url, html, error in results: if error is None: print("%r fetched in %ss" % (url, timer() - start)) else: print("error fetching %r: %s" % (url, error)) print("Elapsed Time: %s" % (timer() - start,)) 

Las ventajas en comparación con la solución basada en hilos:

  • ThreadPool permite limitar el número máximo de conexiones simultáneas ( 20 en el ejemplo de código)
  • la salida no está distorsionada porque toda la salida está en el hilo principal
  • los errores son registrados
  • El código funciona tanto en Python 2 como en 3 sin cambios (asumiendo que from urllib.request import urlopen en Python 3).

El ejemplo principal en concurrent.futures hace todo lo que desea, mucho más simple. Además, puede manejar una gran cantidad de URL al hacer solo 5 a la vez, y maneja los errores mucho mejor.

Por supuesto, este módulo solo está integrado con Python 3.2 o posterior … pero si está usando 2.5-3.1, puede instalar el puerto trasero, futures , fuera de PyPI. Todo lo que necesita cambiar del código de ejemplo es buscar y reemplazar concurrent.futures con futures y, para 2.x, urllib.request con urllib2 .

Aquí está el ejemplo de backported a 2.x, modificado para usar su lista de URL y para agregar los tiempos:

 import concurrent.futures import urllib2 import time start = time.time() urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] # Retrieve a single page and report the url and contents def load_url(url, timeout): conn = urllib2.urlopen(url, timeout=timeout) return conn.readall() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print '%r generated an exception: %s' % (url, exc) else: print '"%s" fetched in %ss' % (url,(time.time() - start)) print "Elapsed Time: %ss" % (time.time() - start) 

Pero puedes hacer esto aún más simple. Realmente, todo lo que necesitas es:

 def load_url(url): conn = urllib2.urlopen(url, timeout) data = conn.readall() print '"%s" fetched in %ss' % (url,(time.time() - start)) return data with futures.ThreadPoolExecutor(max_workers=5) as executor: pages = executor.map(load_url, urls) print "Elapsed Time: %ss" % (time.time() - start) 

Ahora estoy publicando una solución diferente, al tener los subprocesos de trabajo not-deamon y unirlos al subproceso principal (lo que significa bloquear el subproceso principal hasta que todos los subprocesos de trabajo hayan finalizado) en lugar de notificar el final de la ejecución de cada subproceso de trabajador con un callback a una función global (como hice en la respuesta anterior), ya que en algunos comentarios se observó que esa forma no es segura para subprocesos.

 import threading import urllib2 import time start = time.time() urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] class FetchUrl(threading.Thread): def __init__(self, url): threading.Thread.__init__(self) self.url = url def run(self): urlHandler = urllib2.urlopen(self.url) html = urlHandler.read() print "'%s\' fetched in %ss" % (self.url,(time.time() - start)) for url in urls: FetchUrl(url).start() #Join all existing threads to main thread. for thread in threading.enumerate(): if thread is not threading.currentThread(): thread.join() print "Elapsed Time: %s" % (time.time() - start) 

Esta secuencia de comandos obtiene el contenido de un conjunto de URL definidas en una matriz. Genera un hilo para cada URL que se va a buscar, por lo que está destinado a ser utilizado para un conjunto limitado de URL.

En lugar de usar un objeto de cola, cada subproceso está notificando su final con una callback a una función global, que mantiene el recuento del número de subprocesos en ejecución.

 import threading import urllib2 import time start = time.time() urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] left_to_fetch = len(urls) class FetchUrl(threading.Thread): def __init__(self, url): threading.Thread.__init__(self) self.setDaemon = True self.url = url def run(self): urlHandler = urllib2.urlopen(self.url) html = urlHandler.read() finished_fetch_url(self.url) def finished_fetch_url(url): "callback function called when a FetchUrl thread ends" print "\"%s\" fetched in %ss" % (url,(time.time() - start)) global left_to_fetch left_to_fetch-=1 if left_to_fetch==0: "all urls have been fetched" print "Elapsed Time: %ss" % (time.time() - start) for url in urls: "spawning a FetchUrl thread for each url to fetch" FetchUrl(url).start()