Usando ssl context.set_servername_callback en Python

Tengo el objective de permitir que un cliente ssl seleccione de un número de pares de certificados válidos del servidor. El cliente tiene un certificado de CA que utilizará para validar el certificado proveniente del servidor.

Así que para intentar lograr esto, estoy usando el ssl.SSLContext.set_servername_callback() en el servidor en combinación con ssl.SSLSocket.wrap_socket's parameter: ssl.SSLSocket.wrap_socket ssl.SSLSocket.wrap_socket's parameter: server_hostname` para intentar que el cliente especifique qué par de llaves usar. Así es como se ve el código:

Código del servidor:

 import sys import pickle import ssl import socket import select request = {'msgtype': 0, 'value': 'Ping', 'test': [chr(i) for i in range(256)]} response = {'msgtype': 1, 'value': 'Pong'} def handle_client(c, a): print("Connection from {}:{}".format(*a)) req_raw = c.recv(10000) req = pickle.loads(req_raw) print("Received message: {}".format(req)) res = pickle.dumps(response) print("Sending message: {}".format(response)) c.send(res) def run_server(hostname, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((hostname, port)) s.listen(8) print("Serving on {}:{}".format(hostname, port)) try: while True: (c, a) = s.accept() def servername_callback(sock, req_hostname, cb_context, as_callback=True): print('Loading certs for {}'.format(req_hostname)) server_cert = "ssl/{}/server".format(req_hostname) # NOTE: This use of socket input is INSECURE cb_context.load_cert_chain(certfile="{}.crt".format(server_cert), keyfile="{}.key".format(server_cert)) # Seems like this is designed usage: https://github.com/python/cpython/blob/3.4/Modules/_ssl.c#L1469 sock.context = cb_context return None context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) context.set_servername_callback(servername_callback) default_cert = "ssl/3.1/server" context.load_cert_chain(certfile="{}.crt".format(default_cert), keyfile="{}.key".format(default_cert)) ssl_sock = context.wrap_socket(c, server_side=True) try: handle_client(ssl_sock, a) finally: c.close() except KeyboardInterrupt: s.close() if __name__ == '__main__': hostname = '' port = 6789 run_server(hostname, port) 

Codigo del cliente:

 import sys import pickle import socket import ssl request = {'msgtype': 0, 'value': 'Ping', 'test': [chr(i) for i in range(256)]} response = {'msgtype': 1, 'value': 'Pong'} def client(hostname, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) print("Connecting to {}:{}".format(hostname, port)) s.connect((hostname, port)) ssl_sock = ssl.SSLSocket(sock=s, ca_certs="server_old.crt", cert_reqs=ssl.CERT_REQUIRED, server_hostname='3.2') print("Sending message: {}".format(request)) req = pickle.dumps(request) ssl_sock.send(req) resp_raw = ssl_sock.recv(10000) resp = pickle.loads(resp_raw) print("Received message: {}".format(resp)) ssl_sock.close() if __name__ == '__main__': hostname = 'localhost' port = 6789 client(hostname, port) 

Pero no está funcionando. Lo que parece estar sucediendo es que se llama a servername_callback , se obtiene el “nombre de host” especificado y la llamada a context.load_cert_chain dentro de la callback no falla (aunque falla si se le da una ruta que no existe). Sin embargo, el servidor siempre devuelve el par de certificados que se cargó antes de llamar a context.wrap_socket(c, server_side=True) . Entonces, mi pregunta es: ¿hay alguna manera, dentro de servername_callback , para modificar el par de claves utilizado por el contexto ssl, y obtener el certificado de ese par de claves que se usará para la conexión?

También debo tener en cuenta que verifiqué el tráfico y que el certificado del servidor NO se enviará hasta después de que la función servername_callback devuelva (y nunca se enviará si no se completa correctamente o devuelve un valor de “falla”).

En su callback, cb_context es el mismo contexto en el que se llamó a wrap_socket() , y lo mismo que socket.context , por lo que socket.context = cb_context establece el contexto en el mismo contexto que antes.

Cambiar la cadena de certificados de un contexto no afecta al certificado utilizado para la operación de wrap_socket() actual. La explicación de esto radica en cómo openssl crea sus objetos subyacentes, en este caso, las estructuras SSL subyacentes ya se han creado y utilizan copias de las cadenas :

NOTAS

Las cadenas asociadas con una estructura SSL_CTX se copian a cualquier estructura SSL cuando se llama a SSL_new (). Las estructuras SSL no se verán afectadas por ninguna cadena que se cambie posteriormente en el SSL_CTX principal.

Cuando se configura un nuevo contexto, las estructuras SSL se actualizan, pero esa actualización no se realiza cuando el nuevo contexto es igual al anterior .

sock.context establecer sock.context en un contexto diferente para que funcione. Actualmente crea una instancia de un nuevo contexto en cada nueva conexión entrante, que no es necesario. En su lugar, debe crear una instancia de su contexto estándar una sola vez y reutilizarlo. Lo mismo ocurre con los contextos cargados dinámicamente, puede crearlos todos en el inicio y colocarlos en un dict para que pueda hacer una búsqueda, por ejemplo:

 ... contexts = {} for hostname in os.listdir("ssl"): print('Loading certs for {}'.format(hostname)) server_cert = "ssl/{}/server".format(hostname) context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) context.load_cert_chain(certfile="{}.crt".format(server_cert), keyfile="{}.key".format(server_cert)) contexts[hostname] = context def servername_callback(sock, req_hostname, cb_context, as_callback=True): context = contexts.get(req_hostname) if context is not None: sock.context = context else: pass # handle unknown hostname case def run_server(hostname, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((hostname, port)) s.listen(8) print("Serving on {}:{}".format(hostname, port)) context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) context.set_servername_callback(servername_callback) default_cert = "ssl/3.1/server" context.load_cert_chain(certfile="{}.crt".format(default_cert), keyfile="{}.key".format(default_cert)) try: while True: (c, a) = s.accept() ssl_sock = context.wrap_socket(c, server_side=True) try: handle_client(ssl_sock, a) finally: c.close() except KeyboardInterrupt: s.close() 

Así que después de ver este post y algunos otros en línea, armé una versión del código anterior, que funcionó perfectamente para mí … así que pensé en compartirlo. En caso de que ayude a alguien más.

 import sys import ssl import socket import os from pprint import pprint DOMAIN_CONTEXTS = {} ssl_root_path = "c:/ssl/" # ---------------------------------------------------------------------------------------------------------------------- # # As an example create domains in the ssl root path...ie # # c:/ssl/example.com # c:/ssl/johndoe.com # c:/ssl/test.com # # And then create self signed ssl certificates for each domain to test... and put them in the corresponding domain # directory... in this case the cert and key files are called cert.pem, and key.pem.... # def setup_ssl_certs(): global DOMAIN_CONTEXTS for hostname in os.listdir(ssl_root_path): #print('Loading certs for {}'.format(hostname)) # Establish the certificate and key folder...for the various domains... server_cert = '{rp}{hn}/'.format(rp=ssl_root_path, hn=hostname) # Setup the SSL Context manager object, for authentication context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) # Load the certificate file, and key file...into the context manager. context.load_cert_chain(certfile="{}cert.pem".format(server_cert), keyfile="{}key.pem".format(server_cert)) # Set the context object to the global dictionary DOMAIN_CONTEXTS[hostname] = context # Uncomment for testing only. #pprint(contexts) # ---------------------------------------------------------------------------------------------------------------------- def servername_callback(sock, req_hostname, cb_context, as_callback=True): """ This is a callback function for the SSL Context manager, this is what does the real work of pulling the domain name in the origional request. """ # Uncomment for testing only #print(sock) #print(req_hostname) #print(cb_context) context = DOMAIN_CONTEXTS.get(req_hostname) if context: try: sock.context = context except Exception as error: print(error) else: sock.server_hostname = req_hostname else: pass # handle unknown hostname case def handle_client(conn, a): request_domain = conn.server_hostname request = conn.recv() client_ip = conn.getpeername()[0] resp = 'Hello {cip} welcome, from domain {d} !'.format(cip=client_ip, d=request_domain) conn.write(b'HTTP/1.1 200 OK\n\n%s' % resp.encode()) def run_server(hostname, port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((hostname, port)) s.listen(8) #print("Serving on {}:{}".format(hostname, port)) context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH) # For Python 3.4+ context.set_servername_callback(servername_callback) # Only available in 3.7 !!!! have not tested it yet... #context.sni_callback(servername_callback) default_cert = "{rp}default/".format(rp=ssl_root_path) context.load_cert_chain(certfile="{}cert.pem".format(default_cert), keyfile="{}key.pem".format(default_cert)) context.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 # optional context.set_ciphers('EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH') try: while True: ssock, addr = s.accept() try: conn = context.wrap_socket(ssock, server_side=True) except Exception as error: print('!!! Error, {e}'.format(e=error)) except ssl.SSLError as e: print(e) else: handle_client(conn, addr) if conn: conn.close() #print('Connection closed !') except KeyboardInterrupt: s.close() # ---------------------------------------------------------------------------------------------------------------------- def main(): setup_ssl_certs() # Don't forget to update your static name resolution... ie example.com = 127.0.0.1 run_server('example.com', 443) # ---------------------------------------------------------------------------------------------------------------------- if __name__ == '__main__': main()