Raspado concurrentemente con selenium en python

Estoy tratando de raspar simultáneamente con los módulos de selenium y multiprocesamiento. A continuación es más o menos mi enfoque:

  • crear cola con el número de instancias de webdriver igual al número de trabajadores
  • crear grupo de trabajadores
  • cada trabajador extrae la instancia de webdriver de la cola
  • Cuando la función termina, la instancia de webdriver se vuelve a poner en la cola.

Aquí está el código:

#!/usr/bin/env python # encoding: utf-8 import time import codecs from selenium import webdriver from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from multiprocessing import Pool from Queue import Queue def download_and_save(link_tuple): link_id, link = link_tuple print link_id w = q.get() w.get(link) with codecs.open('%s.html' % link_id, 'w', encoding='utf-8') as f: f.write(w.page_source) time.sleep(10) q.put(w) def main(num_processes): links = [ 'http://openjurist.org/743/f2d/273/zwiener-v-commissioner-of-internal-revenue', 'http://www.oyez.org/advocates/z/l/lonny_f_zwiener', 'http://www.texasbar.com/attorneys/member.cfm?id=21191', 'https://www.courtlistener.com/opinion/441662/lonny-f-zwiener-and-ardith-e-zwiener-v-commissione/cited-by', 'https://www.courtlistener.com/opinion/441662/lonny-f-zwiener-and-ardith-e-zwiener-v-commissione/authorities/', 'http://www.myheritage.com/names/lonny_zwiene', 'https://law.resource.org/pub/us/case/reporter/F2/743/743.F2d.273.84-4068.htm', 'http://www.ancestry.com/1940-census/usa/Texas/Lonny-F-Zwiener_5bbff', 'http://search.ancestry.com/cgi-bin/sse.dll?gl=34&rank=1&new=1&so=3&MSAV=0&msT=1&gss=ms_f-34&gl=bmd_death&rank=1&new=1&so=1&MSAV=0&msT=1&gss=ms_f-2_s&gsfn=Lonny&gsln=Zwiener&msypn__ftp=T', 'http://www.mocavo.com/Lonny-F-Zwiener-Fredericksburg-Gillespie-Texas-1940-United-States-Census/0798164756456805432', 'http://www.taftlaw.com/attorneys/635-mark-s-yuric' ] n = len(links) link_tuples = [(link_id, link) for link_id, link in zip(xrange(n), links)] pool = Pool(num_processes) pool.map(download_and_save, link_tuples) if __name__ == '__main__': num_processes = 2 q = Queue() dcap = dict(DesiredCapabilities.PHANTOMJS) dcap["phantomjs.page.settings.userAgent"] = ( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36" ) for i in range(num_processes): w = webdriver.PhantomJS(desired_capabilities=dcap) q.put(w) main(num_processes) 

Este script se ejecuta pero los htmls guardados están duplicados o faltan.

Aquí hay un enfoque diferente con el que he tenido éxito: mantiene a sus trabajadores en __main__, y los trabajadores se retiran de task_q.

 import multiprocessing import traceback class scrapeWorker(multiprocessing.Process): def __init__(self, worker_num, task_q, result_q): super(scrapeWorker, self).__init__() self.worker_num = worker_num self.task_q = task_q self.result_q = result_q self.scraper = my_scraper_class() # this contains driver code, methods, etc. def handleWork(self, work): assert isinstance(work, tuple) or isinstance(work, list), "work should be a tuple or list. found {}".format(type(work)) assert len(work) == 2, "len(work) != 2. found {}".format(work) assert isinstance(work[1], dict), "work[1] should be a dict. found {}".format(type(work[1])) # do the work result = getattr( self.scraper, work[0] )( **work[1] ) self.result_q.put( result ) # worker.run() is actually called via worker.start() def run(self): try: self.scraper.startDriving() while True: work = self.task_q.get() if work == 'KILL': self.scraper.driver.quit() break self.handleWork( work ) except: print traceback.format_exc() raise if __name__ == "__main__": num_workers = 4 manager = multiprocessing.Manager() task_q = manager.Queue() result_q = manager.Queue() workers = [] for worker_num in xrange(num_workers): worker = scrapeWorker(worker_num, task_q, result_q) worker.start() workers.append( worker ) # you decide what job_stuff is # work == [ 'method_name', {'kw_1': val_1, ...} ] for work in job_stuff: task_q.put( work ) results = [] while len(results) < len(job_stuff): results.append( result_q.get() ) for worker in workers: task_q.put( "KILL" ) for worker in workers: worker.join() print "finished!" ####