思想:
基于udp协议向遍历子网内所有ip地址,发送udp数据包到一个关闭的端口(你认为可能关闭的端口,如不放心可指定多个),如果受到回应的ICMP包说明此主机存在。udp发送数据包开销比较小。可以在此基础上增减namp调用的功能,完整扫描发现的主机上的端口。
使用的python模块:netaddr,socket,ctypes,struct,threading,time,os
#!/usr/bin/python
from netaddr import IPAddress,IPNetwork
import time
import threading
from socket import *
from ctypes import *
import struct
import os
定义两个关键类,利用struct模块,构建类似c语言struct结构。将一个自己构建的列表命名为_fields_,根据ip头的结构,ip类的_fields_列表为:
_fields_=[
("ihl", c_ubyte,4),
("version", c_ubyte,4),
("tos", c_ubyte),
("len", c_ushort),
("id", c_ushort),
("offset", c_ushort),
("ttl", c_ubyte),
("protocol_num",c_ubyte),
("sum", c_ushort),
("src", c_uint),
("dst", c_uint),]
其中每一个元组代表结构中的一个变量,字符串是变量名,后面是类型。c_ubyte,4代表4位。
定义__new__方法,在类刚创建的时候将传入的socket_buffer数据写入结构。
def __new__(self, socket_buffer = None):
return self.from_buffer_copy(socket_buffer)
定义__init__方法,其执行的时候,__new__以经执行。在其中创建一个protocol_map字典,用于辨别协议类型。利用struct模块的pack函数将ip包中的源地址src和目的地址dst由网络大段字节顺序转化成小端字节顺序当作参数传给socket.inet_ntoa函数转化成十进制字符串赋值给类属性。随后利用字典指明协议类型。或是直接str
def __init__(self, socket_buffer = None):
protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
self.src_address = inet_ntoa(struct.pack("<L", self.src))
self.dst_address = inet_ntoa(struct.pack("<L", self.dst))
try:
self.protocol = protocol_map[self.protocol_num]
except:
self.protocol = str(protocol_num)
ICMP处理类(同ip)
class ICMP(Structure):
_fields_=[
("type", c_ubyte),
("code", c_ubyte),
("checksum", c_ushort),
("unused", c_ushort),
("next_hop_mtu",c_ushort),]
def __new__(self, raw_buffer = None):
return self.from_buffer_copy(raw_buffer)
def __init__(self, raw_buffer = None):
pass
def udp_sender(subnet, message):
#time.sleep(5)
sendudp = socket(AF_INET, SOCK_DGRAM)
for ip in IPNetwork(subnet):
try:
sendudp.sendto(message, ("%s"%ip,65532))
except:
pass
定义udp_sender函数,利用netaddr模块遍历子网ip发送udp数据包,
首先创建基于udp协议的socket套接字。其次,利用IPNetwork可以遍历子网ip。向65532端口(任意关闭端口)发送验证字符串(用于检验受到的icmp包是回应你发送的数据包)
def udp_sender(subnet, message):
#time.sleep(5)
sendudp = socket(AF_INET, SOCK_DGRAM)
for ip in IPNetwork(subnet):
try:
sendudp.sendto(message, ("%s"%ip,65532))
except:
pass
本程序只有两个线程如需要可增加,分块扫描。在主线程中处理数据包,字线程用于发送udp数据包。
t = threading.Thread(target = udp_sender, args = (subnet, message,))
t.start()
其中传入的subnet为”10.114.234.0/24“我们学校宿舍楼5楼子网之一,实际使用的时候可以改成其它的。message为验证字符串“PYTHONSNIFFER”
主线程数据包处理是关键地方,构建一个原始套接字用于监听,首先根据os.name的值是否为nt判断是否为windows系统。因为原始套接字在linux下只能接受icmp包。然后根据判断设置原始套接字接受数据包的协议类型。
f os.name == "nt":
sock_protocol = IPPROTO_IP
else:
sock_protocol = IPPROTO_ICMP
接着创建原始套机字。socket.socket()函数的第二个参数为SOCK_RAW即建立了原始套接字。注意原始套接字需要在root下创建。随后bind,setsockopt函数设置套接字保留ip头。sniffer = socket(AF_INET, SOCK_RAW, sock_protocol)
sniffer.bind((host, port))
sniffer.setsockopt(IPPROTO_IP, IP_HDRINCL, 1)
如果是windows系统再利用ioctl打开监听模式
if os.name == "nt":
sniffer.ioctl(SIO_RCVALL, RCVALL_ON)
随后循环接受数据并实例化IP,ICMP对象。因为ip头是接受的保留ip头的数据包的前20字节。8字节icmp头需要先去掉20字节ip头,切片从20开始操作。先实例化IP对象处理ip头,如果接受为icmp包,再实例化ICMP对象,判断IP头src是否在子网范围内,然后判断头部之后的数据是否为验证字符串PYTHONSNIFFER。
while True:
data = sniffer.recvfrom(65565)[0]
ip_header = IP(data[:20])
# print "protocol: %s -> %s"%(ip_header.src_address, ip_header.dst_address)
if ip_header.protocol == "ICMP":
offset = ip_header.ihl*4
icmp_header = ICMP(data[offset:offset+sizeof(ICMP)])
# print "ICMP Type: %d, Code: %d" % (icmp_header.type, icmp_header.code)
if icmp_header.code == 3 and icmp_header.type == 3:
if IPAddress(ip_header.src_address) in IPNetwork(subnet):
if data[len(data) - len(message):] == "PYTHONSNIFFER":
print "Host Up: %s"%(ip_header.src_address)
最后如果遇到键盘异常退出,关闭windows的混杂模式。
扫描10.114.234.0效果:
完整源代码如下:
#!/usr/bin/python
from netaddr import IPAddress,IPNetwork
import time
import threading
from socket import *
from ctypes import *
import struct
import os
#host = "127.0.0.1"
host = "10.114.234.133"
port = 0
subnet = "10.114.234.0/24"
message = "PYTHONSNIFFER"
class IP(Structure):
_fields_=[
("ihl", c_ubyte,4),
("version", c_ubyte,4),
("tos", c_ubyte),
("len", c_ushort),
("id", c_ushort),
("offset", c_ushort),
("ttl", c_ubyte),
("protocol_num",c_ubyte),
("sum", c_ushort),
("src", c_uint),
("dst", c_uint),]
def __new__(self, socket_buffer = None):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer = None):
protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
self.src_address = inet_ntoa(struct.pack("<L", self.src))
self.dst_address = inet_ntoa(struct.pack("<L", self.dst))
try:
self.protocol = protocol_map[self.protocol_num]
except:
self.protocol = str(protocol_num)
class ICMP(Structure):
_fields_=[
("type", c_ubyte),
("code", c_ubyte),
("checksum", c_ushort),
("unused", c_ushort),
("next_hop_mtu",c_ushort),]
def __new__(self, raw_buffer = None):
return self.from_buffer_copy(raw_buffer)
def __init__(self, raw_buffer = None):
pass
def udp_sender(subnet, message):
#time.sleep(5)
sendudp = socket(AF_INET, SOCK_DGRAM)
for ip in IPNetwork(subnet):
try:
sendudp.sendto(message, ("%s"%ip,65532))
except:
pass
t = threading.Thread(target = udp_sender, args = (subnet, message,))
t.start()
if os.name == "nt":
sock_protocol = IPPROTO_IP
else:
sock_protocol = IPPROTO_ICMP
sniffer = socket(AF_INET, SOCK_RAW, sock_protocol)
sniffer.bind((host, port))
sniffer.setsockopt(IPPROTO_IP, IP_HDRINCL, 1)
if os.name == "nt":
sniffer.ioctl(SIO_RCVALL, RCVALL_ON)
try:
while True:
data = sniffer.recvfrom(65565)[0]
ip_header = IP(data[:20])
# print "protocol: %s -> %s"%(ip_header.src_address, ip_header.dst_address)
if ip_header.protocol == "ICMP":
offset = ip_header.ihl*4
icmp_header = ICMP(data[offset:offset+sizeof(ICMP)])
# print "ICMP Type: %d, Code: %d" % (icmp_header.type, icmp_header.code)
if icmp_header.code == 3 and icmp_header.type == 3:
if IPAddress(ip_header.src_address) in IPNetwork(subnet):
if data[len(data) - len(message):] == "PYTHONSNIFFER":
print "Host Up: %s"%(ip_header.src_address)
except KeyboardInterrupt:
if os.name == "nt":
sniffer.ioctl(SIO_RCVALL, RCVALL_OFF)