¿Cómo hacer que Apache Spark mapPartition funcione correctamente?

Estoy tratando de hacer un trabajo basado en cada partición y me gustaría devolver los mismos datos como entrada:

from urllib3 import HTTPConnectionPool rdd = sc.parallelize(["peter", "john", "harris"]) def sendPartition(iterator): pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10) for record in iterator: r = pool.request('GET', '/ajax/services/search/web', fields={'q': 'urllib3', 'v': '1.0'}) return iterator rdd.mapPartitions(sendPartition).count() 

Estoy recibiendo este error:

TypeError: el objeto ‘NoneType’ no es iterable

PD: Esto es solo una simplificación de lo que estoy tratando de lograr. Me gustaría realizar solicitudes de geosearch complejas a ElasticSearch (por lo que no puedo usar el conector de Spark Elasticsearch) para cada elemento. Antes de esta partición de mapas, tengo una gran cantidad de filtros, mapas, etc.

PPS: reinicié mi chispa y ahora obtengo “0” como salida, lo que es mejor que un error, sin embargo, esperaba que fuera “3”.

Con respecto al error de tipo, no parece que pueda reproducirse utilizando el código incluido en la pregunta. Supongo que, en algún momento, el valor None se ha pasado al constructor RDD o se ha devuelto desde sendPartition .

El problema con un RDD vacío como salida es el resultado de la forma en que se usa el iterador de partición. PySpark está utilizando itertools.chain para pasar datos a mapPartition que se comporta más o menos de la misma manera que un Iterator Scala.

 import itertools iter = itertools.chain(range(10)) iter.next() ## 0 

Después de terminar un bucle for

 for x in iter: x 

terminas con una chain vacía:

 type(iter) ## itertools.chain iter.nex() ## Traceback (most recent call last) ## ... ## StopIteration: 

Mientras que StopIteration se maneja como parte de la lógica de iteración normal, no hay datos para devolver.

Existen algunas formas de manejar esto donde lo más limpio es extraer una función y usar la comprensión de lista

 def make_request(record, pool): r = pool.request('GET', '/ajax/services/search/web', fields={'q': 'urllib3', 'v': '1.0'}) return r.read() # Or any other data you need. def sendPartition(iterator): pool = HTTPConnectionPool('ajax.googleapis.com', maxsize=10) return [make_request(record, pool) for record in iterator] 

Tenga en cuenta que si desea utilizar el grupo de conexiones, debe leer los datos antes de salir de mapPartitions . Significa que no hay evaluación perezosa (como generadores). Personalmente, consideraría las solicitudes asíncronas (por ejemplo, con async/await await en 3.5, RxPy en otro lugar) dentro de la partición y la evaluación antes de salir.