¿Cómo manejar asyncore dentro de una clase en python, sin bloquear nada?

Necesito crear una clase que pueda recibir y almacenar mensajes SMTP, es decir, correos electrónicos. Para hacerlo, estoy usando asyncore acuerdo con un ejemplo publicado aquí . Sin embargo, asyncore.loop() está bloqueando, así que no puedo hacer nada más en el código.

Así que pensé en usar hilos. Aquí hay un código de ejemplo que muestra lo que tengo en mente:

 class MyServer(smtpd.SMTPServer): # derive from the python server class def process_message(..): # overwrite a smtpd.SMTPServer method to be able to handle the received messages ... self.list_emails.append(this_email) def get_number_received_emails(self): """Return the current number of stored emails""" return len(self.list_emails) def start_receiving(self): """Start the actual server to listen on port 25""" self.thread = threading.Thread(target=asyncore.loop) self.thread.start() def stop(self): """Stop listening now to port 25""" # close the SMTPserver from itself self.close() self.thread.join() 

Espero que te hagas una idea. La clase MyServer debe poder iniciar y detener la escucha del puerto 25 de forma no bloqueable, para poder consultar los mensajes mientras escucha (o no). El método de start inicia el asyncore.loop() , que, cuando se produce una recepción de un correo electrónico, se agrega a una lista interna. Similarmente, el método de stop debería poder detener este servidor, como se sugiere aquí .

A pesar del hecho de que este código no funciona como espero (asyncore parece funcionar para siempre, incluso llamo el método de stop anterior. El error que levanto se detecta dentro de la stop , pero no dentro de la función de target que contiene asyncore.loop() ), No estoy seguro de si mi enfoque del problema es sensato. Cualquier sugerencia para corregir el código anterior o proponer una implementación más sólida ( sin usar software de terceros), se agradece.

La solución provista puede no ser la solución más sofisticada, pero funciona de manera razonable y se ha probado.

En primer lugar, el problema con asyncore.loop() es que bloquea hasta que todos los canales asyncore se cierran, como lo señaló el usuario Wessie en un comentario anterior. Refiriéndose al ejemplo de smtp mencionado anteriormente, resulta que smtpd.SMTPServer hereda de asyncore.dispatcher (como se describe en la documentación de smtpd ), que responde a la pregunta de qué canal se cerrará.

Por lo tanto, la pregunta original se puede responder con el siguiente código de ejemplo actualizado:

 class CustomSMTPServer(smtpd.SMTPServer): # store the emails in any form inside the custom SMTP server emails = [] # overwrite the method that is used to process the received # emails, putting them into self.emails for example def process_message(self, peer, mailfrom, rcpttos, data): # email processing class MyReceiver(object): def start(self): """Start the listening service""" # here I create an instance of the SMTP server, derived from asyncore.dispatcher self.smtp = CustomSMTPServer(('0.0.0.0', 25), None) # and here I also start the asyncore loop, listening for SMTP connection, within a thread # timeout parameter is important, otherwise code will block 30 seconds after the smtp channel has been closed self.thread = threading.Thread(target=asyncore.loop,kwargs = {'timeout':1} ) self.thread.start() def stop(self): """Stop listening now to port 25""" # close the SMTPserver to ensure no channels connect to asyncore self.smtp.close() # now it is save to wait for the thread to finish, ie for asyncore.loop() to exit self.thread.join() # now it finally it is possible to use an instance of this class to check for emails or whatever in a non-blocking way def count(self): """Return the number of emails received""" return len(self.smtp.emails) def get(self): """Return all emails received so far""" return self.smtp.emails .... 

Entonces, al final, tengo un método de inicio y detención para iniciar y detener la escucha en el puerto 25 dentro de un entorno sin locking.

A partir de la otra pregunta, asyncore.loop no termina cuando no hay más conexiones.

Creo que estás un poco pensando demasiado en el enhebrado. Usando el código de la otra pregunta, puede iniciar un nuevo hilo que ejecute asyncore.loop mediante el siguiente fragmento de código:

 import threading loop_thread = threading.Thread(target=asyncore.loop, name="Asyncore Loop") # If you want to make the thread a daemon # loop_thread.daemon = True loop_thread.start() 

Esto lo ejecutará en un nuevo hilo y continuará hasta que todos los canales asyncore estén cerrados.

Debes considerar usar Twisted, en su lugar. http://twistedmatrix.com/trac/browser/trunk/doc/mail/examples/emailserver.tac demuestra cómo configurar un servidor SMTP con un enlace personalizable en la entrega.

La respuesta de Alex es la mejor, pero fue incompleta para mi caso de uso. Quería probar SMTP como parte de una prueba unitaria, lo que significaba construir el servidor SMTP falso dentro de mis objetos de prueba y el servidor no terminaría el subproceso de asyncio, así que tuve que agregar una línea para configurarlo en un subproceso del demonio para permitir el rest de la prueba de la unidad debe completarse sin bloquearse a la espera de que ese hilo de asyncio se una. También agregué el registro completo de todos los datos de correo electrónico para poder afirmar cualquier cosa enviada a través del SMTP.

Aquí está mi clase de SMTP falsa:

 class TestingSMTP(smtpd.SMTPServer): def __init__(self, *args, **kwargs): super(TestingSMTP, self).__init__(*args, **kwargs) self.emails = [] def process_message(self, peer, mailfrom, rcpttos, data, **kwargs): msg = {'peer': peer, 'mailfrom': mailfrom, 'rcpttos': rcpttos, 'data': data} msg.update(kwargs) self.emails.append(msg) class TestingSMTP_Server(object): def __init__(self): self.smtp = TestingSMTP(('0.0.0.0', 25), None) self.thread = threading.Thread() def start(self): self.thread = threading.Thread(target=asyncore.loop, kwargs={'timeout': 1}) self.thread.daemon = True self.thread.start() def stop(self): self.smtp.close() self.thread.join() def count(self): return len(self.smtp.emails) def get(self): return self.smtp.emails 

Y aquí es cómo es llamado por las clases unittest:

 smtp_server = TestingSMTP_Server() smtp_server.start() # send some emails assertTrue(smtp_server.count() == 1) # or however many you intended to send assertEqual(self.smtp_server.get()[0]['mailfrom'], 'first@fromaddress.com') # stop it when done testing smtp_server.stop()