Aprender python y enhebrar. Creo que mi código corre infinitamente. ¿Ayudarme a encontrar bichos?

Así que he empezado a aprender python ahora, y estoy absolutamente enamorado de él.

Estoy construyendo un raspador de datos de facebook a pequeña escala. Básicamente, utilizará Graph API y raspará los primeros nombres del número especificado de usuarios. Funciona bien en un solo hilo (o ningún hilo, supongo).

Utilicé tutoriales en línea para crear la siguiente versión de multiproceso (código actualizado) :

import requests import json import time import threading import Queue GraphURL = 'http://graph.facebook.com/' first_names = {} # will store first names and their counts queue = Queue.Queue() def getOneUser(url): http_response = requests.get(url) # open the request URL if http_response.status_code == 200: data = http_response.text.encode('utf-8', 'ignore') # Get the text of response, and encode it json_obj = json.loads(data) # load it as a json object # name = json_obj['name'] return json_obj['first_name'] # last = json_obj['last_name'] return None class ThreadGet(threading.Thread): """ Threaded name scraper """ def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def run(self): while True: #print 'thread started\n' url = GraphURL + str(self.queue.get()) first = getOneUser(url) # get one user's first name if first is not None: if first_names.has_key(first): # if name has been encountered before first_names[first] = first_names[first] + 1 # increment the count else: first_names[first] = 1 # add the new name self.queue.task_done() #print 'thread ended\n' def main(): start = time.time() for i in range(6): t = ThreadGet(queue) t.setDaemon(True) t.start() for i in range(100): queue.put(i) queue.join() for name in first_names.keys(): print name + ': ' + str(first_names[name]) print '----------------------------------------------------------------' print '================================================================' # Print top first names for key in first_names.keys(): if first_names[key] > 2: print key + ': ' + str(first_names[key]) print 'It took ' + str(time.time()-start) + 's' main() 

Para ser honesto, no entiendo algunas partes del código, pero tengo la idea principal. La salida no es nada. Me refiero a que el shell no tiene nada, así que creo que sigue funcionando.

Entonces, lo que estoy haciendo es llenar la queue con enteros que son los identificadores de usuario en fb. Luego, cada ID se utiliza para construir la URL de la llamada api. getOneUser devuelve el nombre de un usuario a la vez. Esa task (ID) está marcada como “terminada” y continúa.

¿Qué está mal con el código de arriba?

Su función de run original solo procesó un elemento de la cola. En todo solo has quitado 5 artículos de la cola.

Usualmente las funciones run ven como

 run(self): while True: doUsefulWork() 

es decir, tienen un bucle que hace que se realice el trabajo recurrente.

[Editar] OP código editado para incluir este cambio.

Algunas otras cosas útiles para probar:

  • Agregue una statement de impresión a la función de run : encontrará que solo se llama 5 veces.
  • Elimine la llamada queue.join() , esto es lo que hace que el módulo se bloquee, luego podrá sondear el estado de la cola.
  • Poner todo el cuerpo de run en una función. Verifique que pueda usar esa función de una manera única para obtener los resultados deseados, luego
  • Pruébelo con un solo subproceso de trabajo, y finalmente vaya a
  • múltiples hilos de trabajo.

Su uso de los first_names no es seguro para subprocesos. Podría agregar un locking para proteger el incremento. De lo contrario, el código debería funcionar. Podría estar llegando a algún límite de API en Facebook, es decir, debe limitar su tasa de solicitud.

Podría simplificar su código utilizando un grupo de hilos y contando los nombres en el hilo principal:

 #!/usr/bin/env python import json import urllib2 from collections import Counter from multiprocessing.dummy import Pool # use threads def get_name(url): try: return json.load(urllib2.urlopen(url))['first_name'] except Exception: return None # error urls = ('http://graph.facebook.com/%d' % i for i in xrange(100)) p = Pool(5) # 5 concurrent connections first_names = Counter(p.imap_unordered(get_name, urls)) print first_names.most_common() 

Para ver qué errores recibe, puede agregar el registro:

 #!/usr/bin/env python import json import logging import urllib2 from collections import Counter from multiprocessing.dummy import Pool # use threads logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(threadName)s %(message)s") def get_name(url): try: name = json.load(urllib2.urlopen(url))['first_name'] except Exception as e: logging.debug('error: %s url: %s', e, url) return None # error else: logging.debug('done url: %s', url) return name urls = ('http://graph.facebook.com/%d' % i for i in xrange(100)) p = Pool(5) # 5 concurrent connections first_names = Counter(p.imap_unordered(get_name, urls)) print first_names.most_common() 

Una forma sencilla de limitar el número de solicitudes por período de tiempo determinado es usar un semáforo:

 #!/usr/bin/env python import json import logging import time import urllib2 from collections import Counter from multiprocessing.dummy import Pool # use threads from threading import _BoundedSemaphore as BoundedSemaphore, Timer logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(threadName)s %(message)s") class RatedSemaphore(BoundedSemaphore): """Limit to 1 request per `period / value` seconds (over long run).""" def __init__(self, value=1, period=1): BoundedSemaphore.__init__(self, value) t = Timer(period, self._add_token_loop, kwargs=dict(time_delta=float(period) / value)) t.daemon = True t.start() def _add_token_loop(self, time_delta): """Add token every time_delta seconds.""" while True: try: BoundedSemaphore.release(self) except ValueError: # ignore if already max possible value pass time.sleep(time_delta) # ignore EINTR def release(self): pass # do nothing (only time-based release() is allowed) def get_name(gid, rate_limit=RatedSemaphore(value=100, period=600)): url = 'http://graph.facebook.com/%d' % gid try: with rate_limit: name = json.load(urllib2.urlopen(url))['first_name'] except Exception as e: logging.debug('error: %s url: %s', e, url) return None # error else: logging.debug('done url: %s', url) return name p = Pool(5) # 5 concurrent connections first_names = Counter(p.imap_unordered(get_name, xrange(200))) print first_names.most_common() 

Después de la ráfaga inicial, debe hacer una sola solicitud cada 6 segundos.

Considere el uso de solicitudes por lotes .