Spark Python Avro Kafka Deserialiser

He creado un flujo de kafka en una aplicación de python spark y puedo analizar cualquier texto que venga a través de él.

kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) 

Quiero cambiar esto para poder analizar los mensajes de un tema kafka. Al analizar los mensajes de avro de un archivo, lo hago como:

  reader = DataFileReader(open("customer.avro", "r"), DatumReader()) 

Soy nuevo en python y spark, ¿cómo cambio la secuencia para poder analizar el mensaje avro? También, ¿cómo puedo especificar un esquema para usar cuando se lee el mensaje de Avro de Kafka? He hecho todo esto en java antes, pero Python me confunde.

Editar:

Intenté cambiar para incluir el decodificador avro

      kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema)) 

    pero me sale el siguiente error

      TypeError: 'DatumReader' object is not callable 

    Tuve el mismo desafío: deserializar avro mensajes de Kafka en pyspark y resolverlo con el método Messageserializer del módulo Confluente de esquema de registro, como en nuestro caso el esquema se almacena en un registro de esquema confluente.

    Puede encontrar ese módulo en https://github.com/verisign/python-confluent-schemaregistry

     from confluent.schemaregistry.client import CachedSchemaRegistryClient from confluent.schemaregistry.serializers import MessageSerializer schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081') serializer = MessageSerializer(schema_registry_client) # simple decode to replace Kafka-streaming's built-in decode decoding UTF8 () def decoder(s): decoded_message = serializer.decode_message(s) return decoded_message kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder) lines = kvs.map(lambda x: x[1]) lines.pprint() 

    Obviamente, como puede ver, este código está utilizando el nuevo enfoque directo sin receptores, por lo tanto, createdDirectStream (vea más en https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html )

    Como lo menciona @Zoltan Fedor en el comentario, la respuesta proporcionada es un poco antigua ahora, ya que han pasado 2,5 años desde que se escribió. La biblioteca confluent-kafka-python ha evolucionado para admitir la misma funcionalidad de forma nativa. Lo único que se necesita en el código dado es el siguiente.

     from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient from confluent_kafka.avro.serializer.message_serializer import MessageSerializer 

    Y luego, puedes cambiar esta línea –

     kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=serializer.decode_message) 

    Lo había probado y funciona muy bien. Estoy agregando esta respuesta para cualquier persona que pueda necesitarla en el futuro.