Hacer un escáner de puerto rápido

Así que estoy haciendo un escáner de puertos en Python

import socket ip = "External IP" s = socket.socket(2, 1) #socket.AF_INET, socket.SOCK_STREAM def porttry(ip, port): try: s.connect((ip, port)) return True except: return None for port in range(0, 10000): value = porttry(ip, port) if value == None: print("Port not opened on %d" % port) else: print("Port opened on %d" % port) break raw_input() 

Pero esto es demasiado lento, de alguna manera quiero poder cerrar o romper el código después de un período de tiempo de no devolver nada.

Además de configurar el tiempo de espera del zócalo, también puede aplicar la técnica de subprocesos múltiples para impulsar el proceso. Será, en el mejor de los casos, N veces más rápido cuando tenga N puertos para escanear.

 # This script runs on Python 3 import socket, threading def TCP_connect(ip, port_number, delay, output): TCPsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) TCPsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) TCPsock.settimeout(delay) try: TCPsock.connect((ip, port_number)) output[port_number] = 'Listening' except: output[port_number] = '' def scan_ports(host_ip, delay): threads = [] # To run TCP_connect concurrently output = {} # For printing purposes # Spawning threads to scan ports for i in range(10000): t = threading.Thread(target=TCP_connect, args=(host_ip, i, delay, output)) threads.append(t) # Starting threads for i in range(10000): threads[i].start() # Locking the main thread until all threads complete for i in range(10000): threads[i].join() # Printing listening ports from small to large for i in range(10000): if output[i] == 'Listening': print(str(i) + ': ' + output[i]) def main(): host_ip = input("Enter host IP: ") delay = int(input("How many seconds the socket is going to wait until timeout: ")) scan_ports(host_ip, delay) if __name__ == "__main__": main() 

Considere establecer un tiempo de espera en lugar de un bucle for utilizando socket.setdefaulttimeout(timeout) .

Esto debería ser un poco más rápido.

 #-*-coding:utf8;-*- #qpy:3 #qpy:console import socket import os # This is used to set a default timeout on socket # objects. DEFAULT_TIMEOUT = 0.5 # This is used for checking if a call to socket.connect_ex # was successful. SUCCESS = 0 def check_port(*host_port, timeout=DEFAULT_TIMEOUT): ''' Try to connect to a specified host on a specified port. If the connection takes longer then the TIMEOUT we set we assume the host is down. If the connection is a success we can safely assume the host is up and listing on port x. If the connection fails for any other reason we assume the host is down and the port is closed.''' # Create and configure the socket. sock = socket.socket() sock.settimeout(timeout) # the SO_REUSEADDR flag tells the kernel to reuse a local # socket in TIME_WAIT state, without waiting for its natural # timeout to expire. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Like connect(address), but return an error indicator instead # of raising an exception for errors returned by the C-level connect() # call (other problems, such as “host not found,” can still raise exceptions). # The error indicator is 0 if the operation succeeded, otherwise the value of # the errnovariable. This is useful to support, for example, asynchronous connects. connected = sock.connect_ex(host_port) is SUCCESS # Mark the socket closed. # The underlying system resource (eg a file descriptor) # is also closed when all file objects from makefile() are closed. # Once that happens, all future operations on the socket object will fail. # The remote end will receive no more data (after queued data is flushed). sock.close() # return True if port is open or False if port is closed. return connected con = check_port('www.google.com', 83) print(con) 

Creo que este fragmento te podría ayudar: http://www.coderholic.com/python-port-scanner/

socket.setdefaulttimeout (0.5) ¡Esto hará que el progtwig sea más rápido!

socket.setdefualttimeout (tiempo)

se usa para seguir intentando conectarse con el puerto durante un tiempo determinado … cuando se envía una solicitud y hay un tiempo de espera establecido en 2 segundos, por lo que intentará conectarse con el puerto durante 2 segundos … si no habrá respuesta de eso puerto en 2 segundos …. se contará como puerto muerto

Aquí hay un escáner de puertos rápido y simple, que escanea 100000 puertos en 180 segundos:

 import threading import socket target = 'pythonprogramming.net' #ip = socket.gethostbyname(target) def portscan(port): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.5)# try: con = s.connect((target,port)) print('Port :',port,"is open.") con.close() except: pass r = 1 for x in range(1,100): t = threading.Thread(target=portscan,kwargs={'port':r}) r += 1 t.start() 

El siguiente escáner de puertos tiene algunas constantes definidas en la parte superior que puede modificar según sea necesario:

  • PROPÓSITO – mensaje de ayuda para la línea de comando
  • PUERTOS: gama de puertos que le gustaría escanear
  • POOL_SIZE – número de procesos para escanear con
  • TIMEOUT: cuánto tiempo se debe esperar para la conexión del servidor

Siéntase libre de adaptar esto de acuerdo a sus requerimientos. Tal vez agregar algunos argumentos de línea de comandos?

 #! /usr/bin/env python3 import argparse import collections import itertools import multiprocessing import operator import socket PURPOSE = 'Scan for open ports on a computer.' PORTS = range(1 << 16) POOL_SIZE = 1 << 8 TIMEOUT = 0.01 def main(): """Get computer to scan, connect with process pool, and show open ports.""" parser = argparse.ArgumentParser(description=PURPOSE) parser.add_argument('host', type=str, help='computer you want to scan') host = parser.parse_args().host with multiprocessing.Pool(POOL_SIZE, socket.setdefaulttimeout, [TIMEOUT]) \ as pool: results = pool.imap_unordered(test, ((host, port) for port in PORTS)) servers = filter(operator.itemgetter(0), results) numbers = map(operator.itemgetter(1), servers) ordered = sorted(numbers) print(f'Ports open on {host}:', *format_ports(ordered), sep='\n ') field_names = 'family', 'socket_type', 'protocol', 'canon_name', 'address' AddressInfo = collections.namedtuple('AddressInfo', field_names) del field_names def test(address): """Try connecting to the server and return whether or not it succeeded.""" host, port = address for info in itertools.starmap(AddressInfo, socket.getaddrinfo(host, port)): try: probe = socket.socket(info.family, info.socket_type, info.protocol) except OSError: pass else: try: probe.connect(info.address) except OSError: pass else: probe.shutdown(socket.SHUT_RDWR) return True, port finally: probe.close() return False, port def format_ports(ports): """Convert port numbers into strings and show all associated services.""" if ports: for port in ports: try: service = socket.getservbyport(port) except OSError: service = '?' yield f'{port:<5} = {service}' else: yield 'None' if __name__ == '__main__': main()