¿Cuál es el equivalente a scala.util.Try en pyspark?

Tengo un HTTPD access_log pésimo y solo quiero omitir las líneas “pésimas”.

En Scala esto es sencillo:

import scala.util.Try val log = sc.textFile("access_log") log.map(_.split(' ')).map(a => Try(a(8))).filter(_.isSuccess).map(_.get).map(code => (code,1)).reduceByKey(_ + _).collect() 

Para python tengo la siguiente solución definiendo explícitamente una función en contraste usando la notación “lambda”:

 log = sc.textFile("access_log") def wrapException(a): try: return a[8] except: return 'error' log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect() 

¿Hay una mejor manera de hacer esto (por ejemplo, como en Scala) en pyspark?

¡Muchas gracias!

Mejor es un término subjetivo, pero hay algunos enfoques que puedes probar.

  • Lo más simple que puede hacer en este caso particular es evitar las excepciones de cualquier tipo. Todo lo que necesitas es un flatMap y algunos cortes:

     log.flatMap(lambda s : s.split(' ')[8:9]) 

    Como puede ver, significa que no es necesario un manejo de excepciones o un filter posterior.

  • La idea anterior se puede ampliar con un simple envoltorio.

     def seq_try(f, *args, **kwargs): try: return [f(*args, **kwargs)] except: return [] 

    y ejemplo de uso

     from operator import div # FYI operator provides getitem as well. rdd = sc.parallelize([1, 2, 0, 3, 0, 5, "foo"]) rdd.flatMap(lambda x: seq_try(div, 1., x)).collect() ## [1.0, 0.5, 0.3333333333333333, 0.2] 
  • finalmente más enfoque OO:

     import inspect as _inspect class _Try(object): pass class Failure(_Try): def __init__(self, e): if Exception not in _inspect.getmro(e.__class__): msg = "Invalid type for Failure: {0}" raise TypeError(msg.format(e.__class__)) self._e = e self.isSuccess = False self.isFailure = True def get(self): raise self._e def __repr__(self): return "Failure({0})".format(repr(self._e)) class Success(_Try): def __init__(self, v): self._v = v self.isSuccess = True self.isFailure = False def get(self): return self._v def __repr__(self): return "Success({0})".format(repr(self._v)) def Try(f, *args, **kwargs): try: return Success(f(*args, **kwargs)) except Exception as e: return Failure(e) 

    y ejemplo de uso:

     tries = rdd.map(lambda x: Try(div, 1.0, x)) tries.collect() ## [Success(1.0), ## Success(0.5), ## Failure(ZeroDivisionError('float division by zero',)), ## Success(0.3333333333333333), ## Failure(ZeroDivisionError('float division by zero',)), ## Success(0.2), ## Failure(TypeError("unsupported operand type(s) for /: 'float' and 'str'",))] tries.filter(lambda x: x.isSuccess).map(lambda x: x.get()).collect() ## [1.0, 0.5, 0.3333333333333333, 0.2] 

    Incluso puede utilizar la coincidencia de patrones con multipledispatch

     from multipledispatch import dispatch from operator import getitem @dispatch(Success) def check(x): return "Another great success" @dispatch(Failure) def check(x): return "What a failure" a_list = [1, 2, 3] check(Try(getitem, a_list, 1)) ## 'Another great success' check(Try(getitem, a_list, 10)) ## 'What a failure' 

    Si te gusta este enfoque, he empujado un poco más la implementación completa a GitHub y pypi .

Primero, déjame generar algunos datos aleatorios para comenzar a trabajar.

 import random number_of_rows = int(1e6) line_error = "error line" text = [] for i in range(number_of_rows): choice = random.choice([1,2,3,4]) if choice == 1: line = line_error elif choice == 2: line = "1 2 3 4 5 6 7 8 9_1" elif choice == 3: line = "1 2 3 4 5 6 7 8 9_2" elif choice == 4: line = "1 2 3 4 5 6 7 8 9_3" text.append(line) 

Ahora tengo un text cadena que parece

  1 2 3 4 5 6 7 8 9_2 error line 1 2 3 4 5 6 7 8 9_3 1 2 3 4 5 6 7 8 9_2 1 2 3 4 5 6 7 8 9_3 1 2 3 4 5 6 7 8 9_1 error line 1 2 3 4 5 6 7 8 9_2 .... 

Tu solución:

 def wrapException(a): try: return a[8] except: return 'error' log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect() #[('9_3', 250885), ('9_1', 249307), ('9_2', 249772)] 

Aquí está mi solución:

 from operator import add def myfunction(l): try: return (l.split(' ')[8],1) except: return ('MYERROR', 1) log.map(myfunction).reduceByKey(add).collect() #[('9_3', 250885), ('9_1', 249307), ('MYERROR', 250036), ('9_2', 249772)] 

Comentario:

(1) Recomiendo también calcular las líneas con “error” porque no agregará demasiada sobrecarga, y también se puede usar para verificar la cordura, por ejemplo, todos los recuentos deberían sumr el número total de filas en el log, si filtra esas líneas, no tiene idea de que sean realmente malas líneas o algo salió mal en su lógica de encoding.

(2) Intentaré empaquetar todas las operaciones de nivel de línea en una función para evitar el encadenamiento del map , funciones de filter , para que sea más legible.

(3) Desde la perspectiva del rendimiento, generé una muestra de registros 1M y mi código terminó en 3 segundos y el tuyo en 2 segundos, no es una comparación justa, ya que los datos son muy pequeños y mi grupo es bastante robusto, te recomendaría genere un archivo más grande (1e12?) y haga un punto de referencia en el suyo.