Diferencias de rendimiento al utilizar coroutines vs threading.

Hace unos días hice una pregunta en SO sobre cómo ayudarme a diseñar un paradigma para estructurar múltiples solicitudes HTTP

Aquí está el escenario. Me gustaría tener un sistema multi-productor, multi-consumidor. Mis productores rastrean y raspan algunos sitios y agregan los enlaces que encuentra en una cola. Dado que rastrearé varios sitios, me gustaría tener varios productores / rastreadores.

Los consumidores / trabajadores se alimentan de esta cola, realizan solicitudes de TCP / UDP a estos enlaces y guardan los resultados en mi Django DB. También me gustaría tener varios trabajadores ya que cada elemento de la cola es totalmente independiente el uno del otro.

La gente sugirió usar una biblioteca de coroutine para esto, es decir, Gevent o Eventlet. Al no haber trabajado nunca con coroutines, leí que aunque el paradigma de progtwigción es similar a los paradigmas de subprocesos, solo se está ejecutando un subproceso pero cuando se producen llamadas de locking, como las llamadas de E / S, las stacks se cambian en la memoria y el otro en verde. el hilo toma el control hasta que encuentra algún tipo de locking de una llamada de E / S. Con suerte tengo este derecho? Aquí está el código de una de mis publicaciones de SO:

import gevent from gevent.queue import * import time import random q = JoinableQueue() workers = [] producers = [] def do_work(wid, value): gevent.sleep(random.randint(0,2)) print 'Task', value, 'done', wid def worker(wid): while True: item = q.get() try: print "Got item %s" % item do_work(wid, item) finally: print "No more items" q.task_done() def producer(): while True: item = random.randint(1, 11) if item == 10: print "Signal Received" return else: print "Added item %s" % item q.put(item) for i in range(4): workers.append(gevent.spawn(worker, random.randint(1, 100000))) # This doesn't work. for j in range(2): producers.append(gevent.spawn(producer)) # Uncommenting this makes this script work. # producer() q.join() 

Esto funciona bien porque las llamadas de sleep están bloqueando las llamadas y cuando ocurre un evento de sleep , otro hilo verde se hace cargo. Esto es mucho más rápido que la ejecución secuencial. Como puede ver, no tengo ningún código en mi progtwig que cede intencionalmente la ejecución de un hilo a otro hilo. No veo cómo esto encaja en el escenario anterior, ya que me gustaría que todos los subprocesos se ejecuten simultáneamente.

Todo funciona bien, pero siento que el rendimiento que he logrado utilizando Gevent / Eventlets es mayor que el progtwig de ejecución secuencial original, pero drásticamente menor que lo que se podría lograr con el uso de subprocesos reales.

Si tuviera que volver a implementar mi progtwig usando mecanismos de subprocesamiento, cada uno de mis productores y consumidores podrían estar trabajando simultáneamente sin la necesidad de intercambiar stacks dentro y fuera como las rutinas.

¿Debería esto ser re-implementado usando hilos? ¿Está mal mi diseño? No he podido ver los beneficios reales de usar coroutines.

Tal vez mis conceptos sean poco confusos, pero esto es lo que he asimilado. Cualquier ayuda o aclaración de mi paradigma y conceptos sería genial.

Gracias

Como puede ver, no tengo ningún código en mi progtwig que cede intencionalmente la ejecución de un hilo a otro hilo. No veo cómo esto encaja en el escenario anterior, ya que me gustaría que todos los subprocesos se ejecuten simultáneamente.

Hay un solo hilo del sistema operativo, pero varios greenlets. En su caso, gevent.sleep() permite que los trabajadores se ejecuten simultáneamente. El locking de llamadas IO como urllib2.urlopen(url).read() hace lo mismo si usa urllib2 parcheado para trabajar con gevent (llamando a gevent.monkey.patch_*() ).

Vea también Un curso curioso sobre coroutines y concurrencia para comprender cómo un código puede funcionar simultáneamente en un entorno de un solo hilo.

Para comparar las diferencias de rendimiento entre gevent, subprocesamiento y multiprocesamiento, puede escribir el código que sea compatible con todos los enfoques:

 #!/usr/bin/env python concurrency_impl = 'gevent' # single process, single thread ##concurrency_impl = 'threading' # single process, multiple threads ##concurrency_impl = 'multiprocessing' # multiple processes if concurrency_impl == 'gevent': import gevent.monkey; gevent.monkey.patch_all() import logging import time import random from itertools import count, islice info = logging.info if concurrency_impl in ['gevent', 'threading']: from Queue import Queue as JoinableQueue from threading import Thread if concurrency_impl == 'multiprocessing': from multiprocessing import Process as Thread, JoinableQueue 

El rest del script es el mismo para todas las implementaciones de concurrencia:

 def do_work(wid, value): time.sleep(random.randint(0,2)) info("%d Task %s done" % (wid, value)) def worker(wid, q): while True: item = q.get() try: info("%d Got item %s" % (wid, item)) do_work(wid, item) finally: q.task_done() info("%d Done item %s" % (wid, item)) def producer(pid, q): for item in iter(lambda: random.randint(1, 11), 10): time.sleep(.1) # simulate a green blocking call that yields control info("%d Added item %s" % (pid, item)) q.put(item) info("%d Signal Received" % (pid,)) 

No ejecute código a nivel de módulo, póngalo en main() :

 def main(): logging.basicConfig(level=logging.INFO, format="%(asctime)s %(process)d %(message)s") q = JoinableQueue() it = count(1) producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)] workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)] for t in producers+workers: t.daemon = True t.start() for t in producers: t.join() # put items in the queue q.join() # wait while it is empty # exit main thread (daemon workers die at this point) if __name__=="__main__": main() 

gevent es genial cuando tienes muchos hilos (verdes). Lo probé con miles y funcionó muy bien. debe asegurarse de que todas las bibliotecas que utilice para raspar y guardar en la base de datos se vuelvan ecológicas. Afaik, si usan el zócalo de Python, la inyección de Gevent debería funcionar. las extensiones escritas en C (por ejemplo, mysqldb) bloquearían sin embargo y necesitarías usar equivalentes verdes en su lugar.

si usa gevent, podría eliminar las colas, generar un nuevo hilo (verde) para cada tarea, el código del hilo es tan simple como db.save(web.get(address)) . gevent se encargará de la prevención cuando alguna biblioteca en db o web bloquee. Funcionará mientras sus tareas encajen en la memoria.

En este caso, su problema no es con la velocidad del progtwig (es decir, la elección de gevent o subprocesos), sino con el rendimiento de IO de la red. Ese es (debería ser) el cuello de botella que determina qué tan rápido se ejecuta el progtwig.

Gevent es una buena manera de asegurarse de que sea el cuello de botella y no de la architecture de su progtwig.

Este es el tipo de proceso que querrías:

 import gevent from gevent.queue import Queue, JoinableQueue from gevent.monkey import patch_all patch_all() # Patch urllib2, etc def worker(work_queue, output_queue): for work_unit in work_queue: finished = do_work(work_unit) output_queue.put(finished) work_queue.task_done() def producer(input_queue, work_queue): for url in input_queue: url_list = crawl(url) for work in url_list: work_queue.put(work) input_queue.task_done() def do_work(work): gevent.sleep(0) # Actually proces link here return work def crawl(url): gevent.sleep(0) return list(url) # Actually process url here input = JoinableQueue() work = JoinableQueue() output = Queue() workers = [gevent.spawn(worker, work, output) for i in range(0, 10)] producers = [gevent.spawn(producer, input, work) for i in range(0, 10)] list_of_urls = ['foo', 'bar'] for url in list_of_urls: input.put(url) # Wait for input to finish processing input.join() print 'finished producing' # Wait for workers to finish processing work work.join() print 'finished working' # We now have output! print 'output:' for message in output: print message # Or if you'd like, you could use the output as it comes! 

No es necesario esperar a que finalicen las colas de entrada y trabajo, ya lo he demostrado aquí.