老李分享:使用 Python 的 Socket 模块开发 UDP 扫描工具

poptest是业内唯一的测试开发工程师培训机构,测试开发工程师主要是为测试服务开发测试工具,在工作中要求你做网络级别的安全性测试,但是条件限制你无法用商业工具,所以自己动手要写测试工具,在这里我们在测试开发工程师的就业培训中构建了一个场景,就是自己开发udp扫描工具,我们在现阶段主要是用python为主要开发语言来实现各种场景下的测试,而quicktestprofessional的培训我们已经免费。

首先我们了解下概念:套接字能够访问底层网络信息。如,我们可以用它来检查IPICMP报头,他们都是OSI模型中网络层协议。

使用UDP数据包可以当发送信息穿越子网时,不同于TCP,需要三次握手。我们需要做的仅仅是等待ICMP报文回应,来判断对方主机是否可用或不可访问。ICMP协议本质上是一个特殊的控制协议,它可以指示错误报告和控制机器数据传输的的行为。

我们先创建一个socket 并将它绑定到一个外部网卡。这个网卡要启用混淆模式(promiscuous mode),他主要是获取经过这个网卡的所有数据包,包括目标地址不是它的数据包。使用 Windows 时要注意一点:我们需要发送一个 IOCTL 包才能将网卡设置为混淆模式。另外,虽然 linux 需要使用 ICMP,Windows 却可以以一种独立于协议的方式来嗅探收到的数据包。

import socket
import os

# host to listen
HOST = ‘192.168.1.114‘

def sniffing(host, win, socket_prot):
  while 1:
    sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_prot)
    sniffer.bind((host, 0))

    sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

    if win == 1:
      sniffer.ioctl(socket.SIO_RCVALL, socket_RCVALL_ON)

    
    print sniffer.recvfrom(65565)

def main(host):
  if os.name == ‘nt‘:
    sniffing(host, 1, socket.IPPROTO_IP)
  else:
    sniffing(host, 0, socket.IPPROTO_ICMP)

if __name__ == ‘__main__‘:
  main(HOST)

在终端中运行如下命令进行测试:   sudo python sniffer.py

(‘E\x00\x00T\xb3\xec\x00\x005\x01\xe4\x13J}\xe1\x11\xc0\xa8\x01r\x00\x00v\xdfx\xa2\x00\x01sr\x98T\x00\x00\x00\x008\xe3\r\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\‘()*+,-./01234567‘, (‘74.125.225.17‘, 0))
(‘E\x00\x00T\xb4\x1b\x00\x005\x01\xe3\xe4J}\xe1\x11\xc0\xa8\x01r\x00\x00~\xd7x\xa2\x00\x02tr\x98T\x00\x00\x00\x00/\xea\r\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\‘()*+,-./01234567‘, (‘74.125.225.17‘, 0))

我们需要对这些数据包进行解码
下面我们来研究下协议数据包组成,进行IP ICMP层解码

IP 头
典型的 IP 头有如下结构,每个字段都对应一个变量

ICMP头

ICMP 由于内容的不同其消息类型也不同,但每个消息都包括三个一致的元素:type,code (告知接收主机ICMP消息的解码类型)和 checksum。

对于我们的扫描器,如果得到的 type 和 code 值是3,这意味着 Destination Unreachable(目标不可达)和 Port Unreachable (端口不可达) ICMP 消息错误
为描述 ICMP 消息头,用 python 的 ctypes 库来创建一个类

import ctypes

class ICMP(ctypes.Structure):
  _fields_ = [
  (‘type‘, ctypes.c_ubyte),
  (‘code‘, ctypes.c_ubyte),
  (‘checksum‘, ctypes.c_ushort),
  (‘unused‘, ctypes.c_ushort),
  (‘next_hop_mtu‘,ctypes.c_ushort)
  ]

  def __new__(self, socket_buffer):
     return self.from_buffer_copy(socket_buffer)

  def __init__(self, socket_buffer):
    pass

编写消息头解码器

开发 IP/ICMP 消息头解码器。脚本创建了一个 sniffer socket,然后在循环中持续读取数据包并进行解码。
注意代码中将 IP 头的前20个字节读取到了缓存,然后再打印消息头的变量。
ICMP 头数据如下:

import socket
import os
import struct
import ctypes
from ICMPHeader import ICMP

# host to listen on
HOST = ‘192.168.xxx.xxx‘

def main():
  socket_protocol = socket.IPPROTO_ICMP
  sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
  sniffer.bind(( HOST, 0 ))
  sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

  while 1:
    raw_buffer = sniffer.recvfrom(65565)[0]
    ip_header = raw_buffer[0:20]
    iph = struct.unpack(‘!BBHHHBBH4s4s‘ , ip_header)

    version_ihl = iph[0]
    version = version_ihl >> 4
    ihl = version_ihl & 0xF
    iph_length = ihl * 4
    ttl = iph[5]
    protocol = iph[6]
    s_addr = socket.inet_ntoa(iph[8]);
    d_addr = socket.inet_ntoa(iph[9]);

    print ‘IP -> Version:‘ + str(version) + ‘, Header Length:‘ + str(ihl) + \
    ‘, TTL:‘ + str(ttl) + ‘, Protocol:‘ + str(protocol) + ‘, Source:‘\
    + str(s_addr) + ‘, Destination:‘ + str(d_addr)

    buf = raw_buffer[iph_length:iph_length + ctypes.sizeof(ICMP)]
    icmp_header = ICMP(buf)

    print "ICMP -> Type:%d, Code:%d" %(icmp_header.type, icmp_header.code) + ‘\n‘

if __name__ == ‘__main__‘:
  main()

我们运行 traceroute看下数据包情况:

$ traceroute www.google.com
traceroute to www.google.com (74.125.226.50), 30 hops max, 60 byte packets
1 * * *
2 * * *
3 67.59.255.137 (67.59.255.137) 17.183 ms 67.59.255.129 (67.59.255.129) 70.563 ms 67.59.255.137 (67.59.255.137) 21.480 ms
4 451be075.cst.lightpath.net (65.19.99.117) 14.639 ms rtr102.wan.hcvlny.cv.net (65.19.99.205) 24.086 ms 451be075.cst.lightpath.net (65.19.107.117) 24.025 ms
5 64.15.3.246 (64.15.3.246) 24.005 ms 64.15.0.218 (64.15.0.218) 23.961 ms 451be0c2.cst.lightpath.net (65.19.120.194) 23.935 ms
6 72.14.215.203 (72.14.215.203) 23.872 ms 46.943 ms *
7 216.239.50.141 (216.239.50.141) 48.906 ms 46.138 ms 46.122 ms
8 209.85.245.179 (209.85.245.179) 46.108 ms 46.095 ms 46.074 ms
9 lga15s43-in-f18.1e100.net (74.125.226.50) 45.997 ms 19.507 ms 16.607 ms

会得到这种输出 (注意 ICMP 的响应类型):

sudo python ip_header_decode.py
IP -> Version:4, Header Length:5, TTL:252, Protocol:1, Source:65.19.99.117, Destination:192.168.1.114
ICMP -> Type:11, Code:0(...)IP -> Version:4, Header Length:5, TTL:250, Protocol:1, Source:72.14.215.203, Destination:192.168.1.114
ICMP -> Type:11, Code:0
IP -> Version:4, Header Length:5, TTL:56, Protocol:1, Source:74.125.226.50, Destination:192.168.1.114
ICMP -> Type:3, Code:3
IP -> Version:4, Header Length:5, TTL:249, Protocol:1, Source:216.239.50.141, Destination:192.168.1.114
ICMP -> Type:11, Code:0(...)IP -> Version:4, Header Length:5, TTL:56, Protocol:1, Source:74.125.226.50, Destination:192.168.1.114
ICMP -> Type:3, Code:3

开发扫描器

在编写完整的扫描器前首先要安装 netaddr,它是一个用于表示和处理网络地址的 python 库。
Netaddr 提供了操作 IPv4,IPv6 和子网 Mac 等地址的能力。它非常有用,因为我们会用到子网掩码,如192.168.1.0/24
sudo pip install netaddr

我们可以使用如下的代码段来测试这个库 (成功会打印“OK”):

import netaddr

ip = ‘192.168.xxx.xxx‘

if ip in netaddr.IPNetwork(‘192.168.xxx.0/24‘):

    print(‘OK!‘)

进一步强化扫描器

我们会将上面所提到的组织在一起来完成我们的扫描器,然后添加一个循环来向目标子网内的所有地址发送 UDP 数据报。

import threading

import time

import socket

import os

import struct

from netaddr import IPNetwork, IPAddress

from ICMPHeader import ICMP

import ctypes

 

HOST = ‘192.168.xxx.xxx‘

 

SUBNET = ‘192.168.xxx.0/24‘

 

MESSAGE = ‘hellooooo‘

 

def udp_sender(SUBNET, MESSAGE):

    time.sleep(5)

    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    for ip in IPNetwork(SUBNET):

        try:

            sender.sendto(MESSAGE, ("%s" % ip, 65212))

        except:

            pass

def main():

    = threading.Thread(target=udp_sender, args=(SUBNET, MESSAGE))

    t.start()

    socket_protocol = socket.IPPROTO_ICMP

    sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

    sniffer.bind(( HOST, 0 ))

    sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

   

    while 1:

        raw_buffer = sniffer.recvfrom(65565)[0]

        ip_header = raw_buffer[0:20]

        iph = struct.unpack(‘!BBHHHBBH4s4s‘ , ip_header)

    

        version_ihl = iph[0]

        ihl = version_ihl & 0xF

        iph_length = ihl * 4

        src_addr = socket.inet_ntoa(iph[8]);

     

        buf = raw_buffer[iph_length:iph_length + ctypes.sizeof(ICMP)]

        icmp_header = ICMP(buf)

        # check for the type 3 and code and within our target subnet

        if icmp_header.code == 3 and icmp_header.type == 3:

            if IPAddress(src_addr) in IPNetwork(SUBNET):

                if raw_buffer[len(raw_buffer) - len(MESSAGE):] == MESSAGE:

                    print("Host up: %s" % src_addr)

if __name__ == ‘__main__‘:

    main()

运行后得到的结果如下:

sudo python scanner.py

Host up: 192.168.1.114(...)

如果感兴趣可以进一步强化代码,进一步加强扫描器的功能,其实大家可以看到这里技术的难点是你对协议的熟悉程度,我们在做性能测试的过程中也会通过loadrunner去模拟协议的请求,所以关键是实现的内容,语言可以任意选择。欢迎大家咨询poptest测试开发工程师培训。

时间: 2024-10-31 10:53:10

老李分享:使用 Python 的 Socket 模块开发 UDP 扫描工具的相关文章

python之socket模块

UDP client #!/usr/bin/env python2.7 #-*-coding:utf-8 -*- import socket s=socket.socket(socket.AF_INET,socket.SOCK_DGRAM) s.sendto("hello",("localhost",8001)) data,addr = s.recvfrom(1024) print "receive data:%s from %s" % (dat

python基础===socket模块的讲解(转)

一.网络知识的一些介绍 socket 是网络连接端点.例如当你的Web浏览器请求www.jb51.net上的主页时,你的Web浏览器创建一个socket并命令它去连接 www.jb51.net的Web服务器主机,Web服务器也对来自的请求在一个socket上进行监听.两端使用各自的socket来发送和 接收信息. 在使用的时候,每个socket都被绑定到一个特定的IP地址和端口.IP地址是一个由4个数组成的序列,这4个数均是范围 0~255中的值(例如,220,176,36,76):端口数值的取

Python的socket模块详解

一.网络知识的一些介绍 socket 是网络连接端点.例如当你的Web浏览器请求www.jb51.net上的主页时,你的Web浏览器创建一个socket并命令它去连接 www.jb51.net的Web服务器主机,Web服务器也对来自的请求在一个socket上进行监听.两端使用各自的socket来发送和 接收信息. 在使用的时候,每个socket都被绑定到一个特定的IP地址和端口.IP地址是一个由4个数组成的序列,这4个数均是范围 0~255中的值(例如,220,176,36,76):端口数值的取

分享《Python 3网络爬虫开发实战》中文PDF+源代码

下载:https://pan.baidu.com/s/1S9PAGO0123_7Csz14z-e2g 更多资料分享:http://blog.51cto.com/3215120 <Python 3网络爬虫开发实战>中文PDF+源代码 中文版PDF,606页,带目录和书签,文字可以复制粘贴. 配套源代码: 经典书籍,讲解详细: 如图: 原文地址:http://blog.51cto.com/3215120/2312586

分享《Python 3网络爬虫开发实战》中文PDF+源代码+崔庆才

下载:https://pan.baidu.com/s/1XNJwYJRurKN1bScroixpYA更多资料分享:http://blog.51cto.com/14087171 <Python 3网络爬虫开发实战>中文PDF+源代码 中文版PDF,606页,带目录和书签,文字可以复制粘贴. 配套源代码: 经典书籍,讲解详细: 如图: 原文地址:http://blog.51cto.com/14087171/2321606

python中socket模块

一.初识socket      socket 是网络连接端点,每个socket都被绑定到一个特定的IP地址和端口.IP地址是一个由4个数组成的序列,这4个数均是范围 0~255中的值(例如,220,176,36,76):端口数值的取值范围是0~65535.端口数小于1024的都是为众所周知的网络服务所保留的 (例如Web服务使用的80端口):最大的保留数被存储在socket模块的IPPORT_RESERVED变量中.你也可以为你的程序使用另外的端口数 值. 不是所有的IP地址都对世界的其它地方可

使用 Python 的 Socket 模块构建一个 UDP 扫描工具

译文:oschina 英文:bt3gl 当涉及到对一些目标网络的侦察时,出发点无疑是首先发现宿主主机.这个任务还可能包含嗅探和解析网络中数据包的能力. 几周前,我曾经谈到了如何使用Wireshark来进行数据包嗅探,但如果你没有wireshark,你如何去监控网络流量呢? 这一次,Python提供了几种解决方案,今天我将一步步演示如何建立一个UDP主机发现工具.首先,我们要看我们如何处理原始套接字来编写一个简单的嗅探器,它能够查看和解析网络数据包.然后,我们将在子网内多线程运行该进程,结果将在我

python之socket模块详解--小白博客

主要是创建一个服务端,在创建服务端的时候,主要步骤如下:创建socket对象socket——>绑定IP地址和端口bind——>监听listen——>得到请求accept——>接收请求recv——>发送信息send——>关闭close客户端代码就是连接服务器,接收和发送消息,具体流程如下:创建socket对象socket——>connet连接服务器——>获取消息recv——>发送消息send——关闭close 1.服务端代码 #服务器端 import s

Python学习——socket 模块

http://www.cnblogs.com/alex3714/articles/5830365.html 可以看的内容 socket sever #! /usr/bin/env python # -*- coding:utf-8 -*- # Author Ian Ying # mail: [email protected] import socket import os server = socket.socket() server.bind(('localhost', 6969)) #绑定被