Crear dos tareas asíncronas simultáneamente con asyncio

Necesito crear un software que reciba simultáneamente desde el socket y la tubería web y envíe los mensajes al otro canal (recibe del socket, crea un nuevo hilo y lo envía a la tubería. De la misma manera que recibe de la tubería, crea un nuevo hilo y lo envía al socket).

Tengo un problema con el subprocesamiento múltiple, en el arranque del progtwig tengo que iniciar los métodos socket_receiver y pipe_receiver pero solo puedo iniciar pipe_receiver . Intenté eliminar todo el código y guardo solo socket_receiver y pipe_receiver pero solo ingresa en el while True del pipe_receiver .

 import asyncio import sys import json from concurrent.futures.thread import ThreadPoolExecutor import websockets # make the Pool of workers executor = ThreadPoolExecutor(max_workers=10) # Make connection to socket and pipe header = {"Authorization": r"Basic XXXX="} connection = websockets.connect('wss://XXXXXXXX', extra_headers=header) async def socket_receiver(): """Listening from web socket""" async with connection as web_socket: while True: message = await web_socket.recv() # send the message to the pipe in a new thread executor.submit(send_to_pipe(message)) async def pipe_receiver(): """Listening from pipe""" while True: message = sys.stdin.readline() if not message: break executor.submit(send_to_socket(message)) # jsonValue = json.dump(str(line), file); sys.stdout.flush() def send_to_pipe(message): # Check if message is CAM or DENM json_message = json.loads(message) type = int(json_message["header"]["messageID"]) # 1 is DENM message, 2 is CAM message if type == 1 or type == 2: # send the message to the pipe sys.stdout.print(json_message); async def send_to_socket(message): async with connection as web_socket: json_message = json.dumps(message) await web_socket.send(json_message) asyncio.get_event_loop().run_until_complete( asyncio.gather(socket_receiver(),pipe_receiver())) 

Este progtwig es llamado por un subproceso, el proceso padre se comunica con él a través de tuberías conectadas a stdout y stdin.

ACTUALIZACIÓN: Recibo esta excepción con el código @Martijn Pieters

 Traceback (most recent call last): File "X", line 121, in  main() File "X", line 119, in main loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro)) File "X\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 568, in run_until_complete return future.result() File "X", line 92, in connect_pipe reader, writer = await stdio() File "X", line 53, in stdio lambda: asyncio.StreamReaderProtocol(reader), sys.stdin) File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 1421, in connect_read_pipe transport = self._make_read_pipe_transport(pipe, protocol, waiter) File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 433, in _make_read_pipe_transport raise NotImplementedError NotImplementedError 

No está utilizando el ThreadPoolExecutor correctamente, y realmente no quiere usar eso aquí. En su lugar, debe configurar a los consumidores y productores para manejar su socket y tubería con colas para enviar mensajes entre ellos.

  • para cada tipo de conexión, cree una conexión electrónica que cree la conexión, luego pase esa conexión única a las tareas de un consumidor y productor (creadas con asyncio.ensure_future() ) para esa conexión. Utilice asyncio.wait() para ejecutar ambas tareas con return_when=asyncio.FIRST_COMPLETED , de modo que puede cancelar cualquiera que aún se esté ejecutando cuando uno de los dos complete el proceso “early” (por ejemplo, ha fallado).

  • Use una cola para pasar los mensajes del consumidor de uno al productor de la otra conexión.

  • sys.stdin y sys.stdout están bloqueando las secuencias, ¡no se sys.stdout a leerlas y escribirlas! Consulte https://gist.github.com/nathan-hoad/8966377 para obtener una idea general que intenta configurar las secuencias de STDIO sin locking y este problema de asyncio que solicita una función de secuencias sin locking.

  • No use una conexión de socket global, ciertamente no con dos async with separados async with declaraciones. Su método send_to_socket() realmente cerraría el socket porque el async with connection as web_socket: context manager se cierra cuando se envía el primer mensaje, y esto causa problemas para el código socket_receiver que asume que el socket permanece abierto indefinidamente.

  • ¡No uses hilos aquí! Sus conexiones están totalmente gestionadas por asyncio, el subprocesamiento sería muy importante en esto.

  • asyncio.Executor() instancias de asyncio.Executor() solo deben usarse con callables regulares, no con coroutines. Executor.submit() indica que toma un llamable, pasar una coroutine con executor.submit(send_to_pipe(message)) o executor.submit(send_to_socket(message)) hará que se genere una excepción ya que las coroutines no son callables. Probablemente no esté viendo un mensaje de excepción, ya que esa excepción se genera en el otro hilo.

    Esta es la razón por la que tu socket_receiver() coroutine falla; ciertamente comienza pero los bashs de enviar mensajes fallan Cuando ejecuto su código contra un servidor local de websocket simulado, se imprime una advertencia:

     RuntimeWarning: coroutine 'send_to_socket' was never awaited executor.submit(send_to_socket(message)) 

    Cuando no se espera una coroutine, el código en esa coroutine nunca se ejecuta. Envolviendo la coroutine en una que imprime la excepción a stderr ( try: callable(), except Exception: traceback.print_exc(file=sys.stderr)) ) obtienes:

     Traceback (most recent call last): File "soq52219672.py", line 15, in log_exception callable() TypeError: 'coroutine' object is not callable 

Los ejecutores solo deben usarse para integrar código que no se puede convertir al uso de coroutines; El ejecutor administra ese código para que se ejecute en paralelo a las tareas asyncio sin interferencia. Se debe tener cuidado si el código desea interactuar con asyncio tareas asyncio , siempre use asyncio.run_coroutine_threadsafe() o asyncio.call_soon_threadsafe() para llamar a través del límite. Vea la sección Concurrencia y multihilo .

Este es un ejemplo de cómo reescribiría su código para usar el patrón de consumidor / productor, con stdio() basado en la esencia de Nathan Hoad sobre el tema , más un respaldo para Windows donde el soporte para tratar stdio como tuberías es limitado :

 import asyncio import json import os import sys import websockets async def socket_consumer(socket, outgoing): # take messages from the web socket and push them into the queue async for message in socket: await outgoing.put(message) async def socket_producer(socket, incoming): # take messages from the queue and send them to the socket while True: message = await incoming.get() jsonmessage = json.dumps(message) await socket.send(jsonmessage) async def connect_socket(incoming, outgoing, loop=None): header = {"Authorization": r"Basic XXXX="} uri = 'wss://XXXXXXXX' async with websockets.connect(uri, extra_headers=header) as websocket: # create tasks for the consumer and producer. The asyncio loop will # manage these independently consumer_task = asyncio.ensure_future( socket_consumer(websocket, outgoing), loop=loop) producer_task = asyncio.ensure_future( socket_producer(websocket, incoming), loop=loop) # start both tasks, but have the loop return to us when one of them # has ended. We can then cancel the remainder done, pending = await asyncio.wait( [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED) for task in pending: task.cancel() # force a result check; if there was an exception it'll be re-raised for task in done: task.result() # pipe support async def stdio(loop=None): if loop is None: loop = asyncio.get_event_loop() if sys.platform == 'win32': # no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832 # use an executor to read from stdio and write to stdout class Win32StdinReader: def __init__(self): self.stdin = sys.stdin.buffer async def readline(): # a single call to sys.stdin.readline() is thread-safe return await loop.run_in_executor(None, self.stdin.readline) class Win32StdoutWriter: def __init__(self): self.buffer = [] self.stdout = sys.stdout.buffer def write(self, data): self.buffer.append(data) async def drain(self): data, self.buffer = self.buffer, [] # a single call to sys.stdout.writelines() is thread-safe return await loop.run_in_executor(None, sys.stdout.writelines, data) return Win32StdinReader(), Win32StdoutWriter() reader = asyncio.StreamReader() await loop.connect_read_pipe( lambda: asyncio.StreamReaderProtocol(reader), sys.stdin) writer_transport, writer_protocol = await loop.connect_write_pipe( asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb')) writer = asyncio.streams.StreamWriter( writer_transport, writer_protocol, None, loop) return reader, writer async def pipe_consumer(pipereader, outgoing): # take messages from the pipe and push them into the queue while True: message = await pipereader.readline() if not message: break await outgoing.put(message.decode('utf8')) async def pipe_producer(pipewriter, incoming): # take messages from the queue and send them to the pipe while True: jsonmessage = await incoming.get() message = json.loads(jsonmessage) type = int(message.get('header', {}).get('messageID', -1)) # 1 is DENM message, 2 is CAM message if type in {1, 2}: pipewriter.write(jsonmessage.encode('utf8') + b'\n') await pipewriter.drain() async def connect_pipe(incoming, outgoing, loop=None): reader, writer = await stdio() # create tasks for the consumer and producer. The asyncio loop will # manage these independently consumer_task = asyncio.ensure_future( pipe_consumer(reader, outgoing), loop=loop) producer_task = asyncio.ensure_future( pipe_producer(writer, incoming), loop=loop) # start both tasks, but have the loop return to us when one of them # has ended. We can then cancel the remainder done, pending = await asyncio.wait( [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED) for task in pending: task.cancel() # force a result check; if there was an exception it'll be re-raised for task in done: task.result() def main(): loop = asyncio.get_event_loop() pipe_to_socket = asyncio.Queue(loop=loop) socket_to_pipe = asyncio.Queue(loop=loop) socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop) pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop) loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro)) if __name__ == '__main__': main() 

Esto comienza con dos tareas, una para administrar el zócalo y la otra para administrar la conexión STDIO. Ambos inician 2 tareas más, para su consumidor y productor. Hay dos colas para enviar los mensajes del consumidor de uno y al productor del otro.