Cliente de Google PubSub python que devuelve StatusCode.UNAVAILABLE

Estoy intentando establecer una suscripción Pull de larga duración a un tema de Google Cloud PubSub. Estoy usando un código muy similar al ejemplo dado en la documentación aquí , es decir:

def receive_messages(project, subscription_name): """Receives messages from a pull subscription.""" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( project, subscription_name) def callback(message): print('Received message: {}'.format(message)) message.ack() subscriber.subscribe(subscription_path, callback=callback) # The subscriber is non-blocking, so we must keep the main thread from # exiting to allow it to process messages in the background. print('Listening for messages on {}'.format(subscription_path)) while True: time.sleep(60) 

El problema es que a veces recibo el siguiente rastreo:

 Exception in thread Consumer helper: consume bidirectional stream: Traceback (most recent call last): File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner self.run() File "/usr/lib/python3.5/threading.py", line 862, in run self._target(*self._args, **self._kwargs) File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 248, in _blocking_consume self._policy.on_exception(exc) File "/path/to/google/cloud/pubsub_v1/subscriber/policy/thread.py", line 135, in on_exception raise exception File "/path/to/google/cloud/pubsub_v1/subscriber/_consumer.py", line 234, in _blocking_consume for response in response_generator: File "/path/to/grpc/_channel.py", line 348, in __next__ return self._next() File "/path/to/grpc/_channel.py", line 342, in _next raise self grpc._channel._Rendezvous:  

Vi que esto estaba referenciado en otra pregunta, pero aquí estoy preguntando cómo manejarlo adecuadamente en Python. He intentado ajustar la solicitud en una excepción, pero parece que se ejecuta en segundo plano y no puedo volver a intentarlo en caso de que se produzca ese error.

Un enfoque algo intrincado que funciona para mí es una clase de política personalizada. El predeterminado tiene una función on_exception que ignora DEADLINE_EXCEEDED . Puedes hacer una clase que herede el valor predeterminado y también ignora UNAVAILABLE . El mío se parece a esto:

 from google.cloud import pubsub from google.cloud.pubsub_v1.subscriber.policy import thread import grpc class AvailablePolicy(thread.Policy): def on_exception(self, exception): """The parent ignores DEADLINE_EXCEEDED. Let's also ignore UNAVAILABLE. I'm not sure what triggers that error, but if you ignore it, your subscriber seems to work just fine. It's probably an intermittent thing and it reconnects later if you just give it a chance. """ # If this is UNAVAILABLE, then we want to retry. # That entails just returning None. unavailable = grpc.StatusCode.UNAVAILABLE if getattr(exception, 'code', lambda: None)() == unavailable: return # For anything else, fallback on super. super(AvailablePolicy, self).on_exception(exception) subscriber = pubsub.SubscriberClient(policy_class=AvailablePolicy) # Continue to set up as normal. 

Se parece mucho a la on_exception original, simplemente ignora un error diferente. Si lo desea, puede agregar algún registro cada vez que se lance la excepción y verificar que todo funcione. Los mensajes futuros seguirán llegando.