python udp sniffer主机发现工具

思想:

   基于udp协议向遍历子网内所有ip地址,发送udp数据包到一个关闭的端口(你认为可能关闭的端口,如不放心可指定多个),如果受到回应的ICMP包说明此主机存在。udp发送数据包开销比较小。可以在此基础上增减namp调用的功能,完整扫描发现的主机上的端口。

使用的python模块:netaddr,socket,ctypes,struct,threading,time,os

#!/usr/bin/python
from netaddr import IPAddress,IPNetwork
import time
import threading
from socket import *
from ctypes import *
import struct
import os

定义两个关键类,利用struct模块,构建类似c语言struct结构。将一个自己构建的列表命名为_fields_,根据ip头的结构,ip类的_fields_列表为:

_fields_=[
        ("ihl",         c_ubyte,4),
        ("version",     c_ubyte,4),
        ("tos",         c_ubyte),
        ("len",         c_ushort),
        ("id",          c_ushort),
        ("offset",      c_ushort),
        ("ttl",         c_ubyte),
        ("protocol_num",c_ubyte),
        ("sum",         c_ushort),
        ("src",         c_uint),
        ("dst",         c_uint),]
其中每一个元组代表结构中的一个变量,字符串是变量名,后面是类型。c_ubyte,4代表4位。

定义__new__方法,在类刚创建的时候将传入的socket_buffer数据写入结构。

def __new__(self, socket_buffer = None):
                return self.from_buffer_copy(socket_buffer)
定义__init__方法,其执行的时候,__new__以经执行。在其中创建一个protocol_map字典,用于辨别协议类型。利用struct模块的pack函数将ip包中的源地址src和目的地址dst由网络大段字节顺序转化成小端字节顺序当作参数传给socket.inet_ntoa函数转化成十进制字符串赋值给类属性。随后利用字典指明协议类型。或是直接str

def __init__(self, socket_buffer = None):
                protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
                self.src_address = inet_ntoa(struct.pack("<L", self.src))
                self.dst_address = inet_ntoa(struct.pack("<L", self.dst))
                try:
                        self.protocol = protocol_map[self.protocol_num]
                except:
                        self.protocol = str(protocol_num)
ICMP处理类(同ip)

class ICMP(Structure):
        _fields_=[
        ("type",        c_ubyte),
        ("code",        c_ubyte),
        ("checksum",    c_ushort),
        ("unused",      c_ushort),
        ("next_hop_mtu",c_ushort),]
        def __new__(self, raw_buffer = None):
                return self.from_buffer_copy(raw_buffer)
        def __init__(self, raw_buffer = None):
                pass
def udp_sender(subnet, message):
        #time.sleep(5)
        sendudp = socket(AF_INET, SOCK_DGRAM)
        for ip in IPNetwork(subnet):
                try:
                        sendudp.sendto(message, ("%s"%ip,65532))
                except:
                        pass
定义udp_sender函数,利用netaddr模块遍历子网ip发送udp数据包,

首先创建基于udp协议的socket套接字。其次,利用IPNetwork可以遍历子网ip。向65532端口(任意关闭端口)发送验证字符串(用于检验受到的icmp包是回应你发送的数据包)

def udp_sender(subnet, message):
        #time.sleep(5)
        sendudp = socket(AF_INET, SOCK_DGRAM)
        for ip in IPNetwork(subnet):
                try:
                        sendudp.sendto(message, ("%s"%ip,65532))
                except:
                        pass

本程序只有两个线程如需要可增加,分块扫描。在主线程中处理数据包,字线程用于发送udp数据包。

t = threading.Thread(target = udp_sender, args = (subnet, message,))
t.start()
其中传入的subnet为”10.114.234.0/24“我们学校宿舍楼5楼子网之一,实际使用的时候可以改成其它的。message为验证字符串“PYTHONSNIFFER”

主线程数据包处理是关键地方,构建一个原始套接字用于监听,首先根据os.name的值是否为nt判断是否为windows系统。因为原始套接字在linux下只能接受icmp包。然后根据判断设置原始套接字接受数据包的协议类型。

f os.name == "nt":
        sock_protocol = IPPROTO_IP
else:
        sock_protocol = IPPROTO_ICMP
接着创建原始套机字。socket.socket()函数的第二个参数为SOCK_RAW即建立了原始套接字。注意原始套接字需要在root下创建。随后bind,setsockopt函数设置套接字保留ip头。sniffer = socket(AF_INET, SOCK_RAW, sock_protocol)
sniffer.bind((host, port))
sniffer.setsockopt(IPPROTO_IP, IP_HDRINCL, 1)
如果是windows系统再利用ioctl打开监听模式

if os.name == "nt":
        sniffer.ioctl(SIO_RCVALL, RCVALL_ON)
随后循环接受数据并实例化IP,ICMP对象。因为ip头是接受的保留ip头的数据包的前20字节。8字节icmp头需要先去掉20字节ip头,切片从20开始操作。先实例化IP对象处理ip头,如果接受为icmp包,再实例化ICMP对象,判断IP头src是否在子网范围内,然后判断头部之后的数据是否为验证字符串PYTHONSNIFFER。

while True:
                data = sniffer.recvfrom(65565)[0]
                ip_header = IP(data[:20])
        #       print "protocol: %s -> %s"%(ip_header.src_address, ip_header.dst_address)
                if ip_header.protocol == "ICMP":
                        offset = ip_header.ihl*4
                        icmp_header = ICMP(data[offset:offset+sizeof(ICMP)])
                #       print "ICMP  Type: %d, Code: %d" % (icmp_header.type, icmp_header.code)         
                        if icmp_header.code == 3 and icmp_header.type == 3:
                                if IPAddress(ip_header.src_address) in IPNetwork(subnet):
                                        if data[len(data) - len(message):] == "PYTHONSNIFFER":
                                                print "Host Up: %s"%(ip_header.src_address)

最后如果遇到键盘异常退出,关闭windows的混杂模式。

扫描10.114.234.0效果:

完整源代码如下:

#!/usr/bin/python
from netaddr import IPAddress,IPNetwork
import time
import threading
from socket import *
from ctypes import *
import struct
import os
#host = "127.0.0.1"
host = "10.114.234.133"
port = 0
subnet = "10.114.234.0/24"
message = "PYTHONSNIFFER"

class IP(Structure):
    _fields_=[
    ("ihl",        c_ubyte,4),
    ("version",    c_ubyte,4),
    ("tos",        c_ubyte),
    ("len",        c_ushort),
    ("id",        c_ushort),
    ("offset",    c_ushort),
    ("ttl",        c_ubyte),
    ("protocol_num",c_ubyte),
    ("sum",        c_ushort),
    ("src",        c_uint),
    ("dst",        c_uint),]
    def __new__(self, socket_buffer = None):
        return self.from_buffer_copy(socket_buffer)
    def __init__(self, socket_buffer = None):
        protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
        self.src_address = inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = inet_ntoa(struct.pack("<L", self.dst))
        try:
            self.protocol = protocol_map[self.protocol_num]
        except:
            self.protocol = str(protocol_num)
class ICMP(Structure):
    _fields_=[
    ("type",    c_ubyte),
    ("code",    c_ubyte),
    ("checksum",    c_ushort),
    ("unused",    c_ushort),
    ("next_hop_mtu",c_ushort),]
    def __new__(self, raw_buffer = None):
        return self.from_buffer_copy(raw_buffer)
    def __init__(self, raw_buffer = None):
        pass
def udp_sender(subnet, message):
    #time.sleep(5)
    sendudp = socket(AF_INET, SOCK_DGRAM)
    for ip in IPNetwork(subnet):
        try:
            sendudp.sendto(message, ("%s"%ip,65532))
        except:
            pass
t = threading.Thread(target = udp_sender, args = (subnet, message,))
t.start()
if os.name == "nt":
    sock_protocol = IPPROTO_IP
else:
    sock_protocol = IPPROTO_ICMP
sniffer = socket(AF_INET, SOCK_RAW, sock_protocol)
sniffer.bind((host, port))
sniffer.setsockopt(IPPROTO_IP, IP_HDRINCL, 1)
if os.name == "nt":
    sniffer.ioctl(SIO_RCVALL, RCVALL_ON)
try:
    while True:
        data = sniffer.recvfrom(65565)[0]
        ip_header = IP(data[:20])
    #    print "protocol: %s -> %s"%(ip_header.src_address, ip_header.dst_address)
        if ip_header.protocol == "ICMP":
            offset = ip_header.ihl*4
            icmp_header = ICMP(data[offset:offset+sizeof(ICMP)])
        #    print "ICMP  Type: %d, Code: %d" % (icmp_header.type, icmp_header.code)        
            if icmp_header.code == 3 and icmp_header.type == 3:
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):
                    if data[len(data) - len(message):] == "PYTHONSNIFFER":
                        print "Host Up: %s"%(ip_header.src_address)
except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(SIO_RCVALL, RCVALL_OFF)

  

时间: 2024-11-05 15:45:17

python udp sniffer主机发现工具的相关文章

【实战小项目】python开发自动化运维工具--批量操作主机

有很多开源自动化运维工具都很好用如ansible/salt stack等,完全不用重复造轮子.只不过,很多运维同学学习Python之后,苦于没小项目训练,本篇演示用Python写一个批量操作主机的工具,大家空余时候可以试着写写,完善完善. 1 思路分析 在运维工作中,古老的方式部署环境.上线代码可能都需要手动在服务器上敲命令,不胜其烦.所以,脚本,自动化工具等还是很有必要的.我觉得一个批量操作工具应该考虑以下几点: (1)本质上,就是到远程主机上执行命令并返回结果. (2)做到批量.也就是要并发

Python学习笔记——进阶篇【第八周】———FTP断点续传作业&amp;批量主机管理工具

主机管理之paramiko模块学习 http://www.cnblogs.com/wupeiqi/articles/5095821.html 作业1:用socketserver继续完善FTP作业 作业2:开发一个批量主机管理工具 需求: 可以对机器进行分组 可以对指定的一组或多组机器执行批量命令,分发文件(发送\接收) 纪录操作日志 作业参考链接http://www.cnblogs.com/alex3714/articles/5227251.html

Python Udp Socket

socket(套接字),传输层通信的端点,由IP和端口号组成(IP,Port),可以通过socket精确地找到服务器上的进程并与之通信 python2.6实现,基于AF_INET(网络套接字) 类型SOCKET_STREAM(TCP套接字),SOCKET_DGRAM(UDP套接字) UDP socket实现较TCP要简单,没有建立连接的过程,服务端无限循环接收数据,处理数据返回,客户端也无需建立通信连接, 直接发送数据接收数据即可 UDP socket通信逻辑 UDP服务端:创建socket>>

【转载】90行python搭一个音乐搜索工具 —— Song Finder

90行python搭一个音乐搜索工具 —— Song Finder Jul 23, 20153 minute read 之前一段时间读到了这篇博客,其中描述了作者如何用java实现国外著名音乐搜索工具shazam的基本功能.其中所提到的文章又将我引向了关于shazam的一篇论文及另外一篇博客.读完之后发现其中的原理并不十分复杂,但是方法对噪音的健壮性却非常好,出于好奇决定自己用python自己实现了一个简单的音乐搜索工具—— Song Finder, 它的核心功能被封装在 SFEngine 中,

用Python编写博客导出工具

用Python编写博客导出工具 罗朝辉 (http://kesalin.github.io/) CC 许可,转载请注明出处 写在前面的话 我在 github 上用 octopress 搭建了个人博客,octopress 使用Markdown语法编写博文.之前我在CSDN博客上也写过不少的技术博文,都说自己的孩子再丑也是个宝,所以就起了把CSDN博客里面的文章导出到个人博客上的念头.刚开始想找个工具把CSDN博客导出为xml或文本,然后再把xml或文本转换为Markdown博文.可惜搜了一下现有博

python之web路径扫描工具

# coding: UTF-8 import sys, os, time, httplibimport relist_http=[]  #http数组 def open_httptxt():  #打开TXT文本写入数组    try:        passlist = []        list_passlist=[]        xxx = file('http.txt', 'r')        for xxx_line in xxx.readlines():            #

Python UDP broadcast PermissionError: [Errno 13] Permission denied

/********************************************************************** * Python UDP broadcast PermissionError: [Errno 13] Permission denied * 说明: * 使用Python3做UDP广播,遇到这个权限问题,当时就一阵惊讶,第一看 * 到这种无权限的问题. * * 2016-12-10 深圳 南山平山村 曾剑锋 ***********************

转载 Python 安装setuptools和pip工具操作方法(必看)

本文章转载自 脚本之家 http://www.jb51.net  感谢! setuptools模块和pip模块是python进行第三方库扩展的极重要工具,例如我们在需要安装一些爬虫或者数据分析的包时就可以使用pip install命令来直接安装这些包了,因此pip工具一定要提前安装. 一.安装setuptools 在python交互界面执行如下命令,脚本我单独拿出来: ? 1 2 3 4 from urllib import urlopen data = urlopen('http://peak

Python黑帽编程1.3 Python运行时与包管理工具

Python黑帽编程1.3  Python运行时与包管理工具 0.1  本系列教程说明 本系列教程,采用的大纲母本为<Understanding Network Hacks Attack and Defense with Python>一书,为了解决很多同学对英文书的恐惧,解决看书之后实战过程中遇到的问题而作.由于原书很多地方过于简略,笔者根据实际测试情况和最新的技术发展对内容做了大量的变更,当然最重要的是个人偏好.教程同时提供图文和视频教程两种方式,供不同喜好的同学选择. 0.2 前言 前两