python 批量ping服务器

最近在https://pypi.python.org/pypi/mping/0.1.2找到了一个python包,可以用它来批量ping服务器,它是中国的大神写的,支持单个服务器、将服务器IP写在txt或json里都可以。

具体用法有中文文档,感谢作者youfou

这里我改了几个字,方便我这种英文不好的同学使用

mping.py

#!/usr/bin/env python3
# coding: utf-8

import argparse
import ctypes
import json
import os
import random
import re
import select
import socket
import struct
import sys
import threading
import time

if sys.platform.startswith(‘win32‘):
    clock = time.clock
    run_as_root = ctypes.windll.shell32.IsUserAnAdmin() != 0
else:
    clock = time.time
    run_as_root = os.getuid() == 0

DEFAULT_DURATION = 3

EXIT_CODE_BY_USER = 1
EXIT_CODE_DNS_ERR = 2
EXIT_CODE_IO_ERR = 3

# Credit: https://gist.github.com/pyos/10980172
def chk(data):
    x = sum(x << 8 if i % 2 else x for i, x in enumerate(data)) & 0xFFFFFFFF
    x = (x >> 16) + (x & 0xFFFF)
    x = (x >> 16) + (x & 0xFFFF)
    return struct.pack(‘<H‘, ~x & 0xFFFF)

# From the same gist commented above, with minor modified.
def ping(addr, timeout=1, udp=not run_as_root, number=1, data=b‘‘):
    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM if udp else socket.SOCK_RAW, socket.IPPROTO_ICMP) as conn:
        payload = struct.pack(‘!HH‘, random.randrange(0, 65536), number) + data

        conn.connect((addr, 80))
        conn.sendall(b‘\x08\0‘ + chk(b‘\x08\0\0\0‘ + payload) + payload)
        start = clock()

        while select.select([conn], [], [], max(0, start + timeout - clock()))[0]:
            data = conn.recv(65536)
            if data[20:] == b‘\0\0‘ + chk(b‘\0\0\0\0‘ + payload) + payload:
                return clock() - start

class PingResults(list):
    def __init__(self, multiply=1000):
        """
        A list to save ping results, and can be used to count min/avg/max, etc.
        :param multiply: Every valid result will be multiplied by this number.
        """
        super(PingResults, self).__init__()
        self.multiple = multiply

    def append(self, rtt):
        """
        To receive a ping result, accept a number for how long the single ping took, or None for timeout.
        :param rtt: The ping round-trip time.
        """
        if rtt is not None and self.multiple:
            rtt *= self.multiple
        return super(PingResults, self).append(rtt)

    @property
    def valid_results(self):
        return list(filter(lambda x: x is not None, self))

    @property
    def valid_count(self):
        return len(self.valid_results)

    @property
    def loss_rate(self):
        if self:
            return 1 - len(self.valid_results) / len(self)

    @property
    def min(self):
        if self.valid_results:
            return min(self.valid_results)

    @property
    def avg(self):
        if self.valid_results:
            return sum(self.valid_results) / len(self.valid_results)

    @property
    def max(self):
        if self.valid_results:
            return max(self.valid_results)

    @property
    def form_text(self):
        if self.valid_results:#调整结果数据之间的间隔
            return ‘{0.valid_count},          {0.loss_rate:.1%},       {0.min:.1f}/{0.avg:.1f}/{0.max:.1f}‘.format(self)
        elif self:
            return ‘不通‘
        else:
            return ‘EMPTY‘

    def __str__(self):
        return self.form_text

    def __repr__(self):
        return ‘{}({})‘.format(self.__class__.__name__, self.form_text)

class PingTask(threading.Thread):
    def __init__(self, host, timeout, interval):
        """
        A threading.Thread based class for each host to ping.
        :param host: a host name or ip address
        :param timeout: timeout for each ping
        :param interval: the max time to sleep between each ping
        """

        self.host = host

        if re.match(r‘(?:\d{1,3}\.){3}(?:\d{1,3})$‘, host):
            self.ip = host
        else:
            print(‘Resolving host: {}‘.format(host))
            try:
                self.ip = socket.gethostbyname(host)
            except socket.gaierror:
                print(‘Unable to resolve host: {}‘.format(host))
                exit(EXIT_CODE_DNS_ERR)

        self.timeout = timeout
        self.interval = interval

        self.pr = PingResults()
        self.finished = False

        super(PingTask, self).__init__()

    def run(self):
        while not self.finished:
            try:
                rtt = ping(self.ip, timeout=self.timeout)
            except OSError:
                print(‘Unable to ping: {}‘.format(self.host))
                break
            self.pr.append(rtt)
            escaped = rtt or self.timeout
            if escaped < self.interval:
                time.sleep(self.interval - escaped)

    def finish(self):
        self.finished = True

def mping(hosts, duration=DEFAULT_DURATION, timeout=1.0, interval=0.0, quiet=False, sort=True):
    """
    Ping hosts in multi-threads, and return the ping results.
    :param hosts: A list of hosts, or a {name: host, ...} formed dict. A host can be a domain or an ip address
    :param duration: The duration which pinging lasts in seconds
    :param timeout: The timeout for each single ping in each thread
    :param interval: The max time to sleep between each single ping in each thread
    :param quiet: Do not print results while processing
    :param sort: The results will be sorted by valid_count in reversed order if this param is True
    :return: A list of PingResults
    """

    def results(_tasks, _sort=True):
        """
        Return the current status of a list of PingTask
        """
        r = list(zip(heads, [t.pr for t in _tasks]))
        if _sort:
            r.sort(key=lambda x: x[1].valid_count, reverse=True)
        return r

    if isinstance(hosts, list):
        heads = hosts
    elif isinstance(hosts, dict):
        heads = list(hosts.items())
        hosts = hosts.values()
    else:
        type_err_msg = ‘`hosts` should be a host list, or a {name: host, ...} formed dict.‘
        raise TypeError(type_err_msg)

    try:
        tasks = [PingTask(host, timeout, interval) for host in hosts]
    except KeyboardInterrupt:
        exit(EXIT_CODE_BY_USER)
    else:
        doing_msg = ‘Pinging {} hosts‘.format(len(hosts))
        if duration > 0:
            doing_msg += ‘ within {} seconds‘.format(int(duration))
        doing_msg += ‘...‘

        if quiet:
            print(doing_msg)

        for task in tasks:
            task.start()

        try:
            start = clock()
            while True:

                if duration > 0:
                    remain = duration + start - clock()
                    if remain > 0:
                        time.sleep(min(remain, 1))
                    else:
                        break
                else:
                    time.sleep(1)

                if not quiet:
                    print(‘\n{}\n{}‘.format(
                        results_string(results(tasks, True)[:10]),
                        doing_msg))

        except KeyboardInterrupt:
            print()
        finally:
            for task in tasks:
                task.finish()

            # Maybe not necessary?
            # for task in tasks:
            #     task.join()

        return results(tasks, sort)

def table_string(rows):
    rows = list(map(lambda x: list(map(str, x)), rows))
    widths = list(map(lambda x: max(map(len, x)), zip(*rows)))
    rows = list(map(lambda y: ‘ | ‘.join(map(lambda x: ‘{:{w}}‘.format(x[0], w=x[1]), zip(y, widths))), rows))
    rows.insert(1, ‘-|-‘.join(list(map(lambda x: ‘-‘ * x, widths))))
    return ‘\n‘.join(rows)

def results_string(prs):
    named = True if isinstance(prs[0][0], tuple) else False
    rows = [[‘IP‘, ‘有效次数  , 丢包率%  , min/avg/max‘]]
    if named:
        rows[0].insert(0, ‘name‘)

    for head, pr in prs:
        row = list()
        if named:
            row.extend(head)
        else:
            row.append(head)
        row.append(pr.form_text)
        rows.append(row)
    return table_string(rows)

def main():
    ap = argparse.ArgumentParser(
        description=‘Ping multiple hosts concurrently and find the fastest to you.‘,
        epilog=‘A plain text file or a json can be used as the -p/--path argument: ‘
               ‘1. Plain text file: hosts in lines; ‘
               ‘2. Json file: hosts in a list or a object (dict) with names.‘)

    ap.add_argument(
        ‘hosts‘, type=str, nargs=‘*‘,
        help=‘a list of hosts, separated by space‘)

    ap.add_argument(
        ‘-p‘, ‘--path‘, type=str, metavar=‘path‘,
        help=‘specify a file path to get the hosts from‘)

    ap.add_argument(
        ‘-d‘, ‘--duration‘, type=float, default=DEFAULT_DURATION, metavar=‘secs‘,
        help=‘the duration how long the progress lasts (default: {})‘.format(DEFAULT_DURATION))

    ap.add_argument(
        ‘-i‘, ‘--interval‘, type=float, default=0.0, metavar=‘secs‘,
        help=‘the max time to wait between pings in each thread (default: 0)‘)

    ap.add_argument(
        ‘-t‘, ‘--timeout‘, type=float, default=1.0, metavar=‘secs‘,
        help=‘the timeout for each single ping in each thread (default: 1.0)‘)

    ap.add_argument(
        ‘-a‘, ‘--all‘, action=‘store_true‘,
        help=‘show all results (default: top 10 results)‘
             ‘, and note this option can be overridden by -S/--no_sort‘)

    ap.add_argument(
        ‘-q‘, ‘--quiet‘, action=‘store_true‘,
        help=‘do not print results while processing (default: print the top 10 hosts)‘
    )

    ap.add_argument(
        ‘-S‘, ‘--do_not_sort‘, action=‘store_false‘, dest=‘sort‘,
        help=‘do not sort the results (default: sort by ping count in descending order)‘)

    args = ap.parse_args()

    hosts = None
    if not args.path and not args.hosts:
        ap.print_help()
    elif args.path:
        try:
            with open(args.path) as fp:
                hosts = json.load(fp)
        except IOError as e:
            print(‘Unable open file:\n{}‘.format(e))
            exit(EXIT_CODE_IO_ERR)
        except json.JSONDecodeError:
            with open(args.path) as fp:
                hosts = re.findall(r‘^\s*([a-z0-9\-.]+)\s*$‘, fp.read(), re.M)
    else:
        hosts = args.hosts

    if not hosts:
        exit()

    results = mping(
        hosts=hosts,
        duration=args.duration,
        timeout=args.timeout,
        interval=args.interval,
        quiet=args.quiet,
        sort=args.sort
    )

    if not args.all and args.sort:
        results = results[:10]

    if not args.quiet:
        print(‘\n********最终检查结果:*************************\n‘)

    print(results_string(results))

if __name__ == ‘__main__‘:
    main()

时间: 2024-12-15 13:45:09

python 批量ping服务器的相关文章

python批量修改服务器密码,同时保存execl表格

#!/usr/bin/env python #coding:utf8 #随机生成自定义长度密码 from random import choice import string,pickle def GenPassword(length=8,chars=string.ascii_letters+string.digits): return ''.join([choice(chars) for i in range(length)]) def passlist(r_user,c_user,ip_li

saltstack+python批量修改服务器密码

saltstack安装:略过 python脚本修改密码: 1 # -*- coding utf-8 -*- 2 import socket 3 import re 4 import os 5 import sys 6 import crypt 7 8 9 localIP = socket.gethostbyname(socket.gethostname()) 10 11 def password(): 12 localIP = socket.gethostbyname(socket.gethos

python 批量ping脚本不能用os.system

os.system(cmd)通过执行命令会得到返回值. ping通的情况下返回值为0. ping不通的情况: 1.请求超时,返回值1 2.无法访问目标主机,返回值为 0,和ping通返回值相同 所以建议不用os.system

使用python编写批量ping主机脚本

通过使用python,编写脚本批量ping主机,验证主机是否处于活动状态 #!/usr/bin/python #auther: Jacky #date: 2016-08-01 #filename: ping_ip.py import os,sys import subprocess,cmd def subping():     f = open("ip_list.txt","r")     lines = f.readlines()     for line in 

python批量同步web服务器代码核心程序

#!/usr/bin/env python #coding:utf8 import os,sys import md5,tab from mysql_co.my_db import set_mysql from ssh_co.ssh_connect import sshd from ssh_co.cfg.config import ssh_message,item_path from file import findfile def my_mysql(): db_file={} my_conne

批量Ping IP

刚刚接触Python 想做点什么 听说Python 在网络方便很厉害 后来总结如下: 第一:发现公司都固定IP 每次新来同事都要猜一个没有人用的IP  很费劲 第二:我们公司有的IP可以上QQ 有的不可以所以我每次也要换IP O(∩_∩)O 所以想到用Python 做一个批量Ping IP的工具 以至于方便于自 方便于人 少说多做  先上图 原理很简单 什么语言都可以实现的  献丑了 上代码 1 import subprocess 2 import string 3 import os 4 im

Ansible playbook 批量修改服务器密码 先普通后root用户

fsckzy Ansible playbook 批量修改服务器密码 客户的需求:修改所有服务器密码,密码规则为Rfv5%+主机名后3位 背景:服务器有CentOS6.7,SuSE9.10.11,root不能直接登录,需先登录普通用户,再切换到root. 首先在hosts 下面添加一个组[test],下面跟ip,每行一个. ansible基于ssh连接inventory中指定的远程主机时,将以此处的参数指定的属性进行: ansible_ssh_port 指定ssh 端口 ansible_ssh_u

python批量修改linux主机密码

+++++++++++++++++++++++++++++++++++++++++++标题:python批量修改Linux服务器密码时间:2019年2月24日内容:基于python实现批量修改linux主机密码.作者:Bruce重点:python使用SSH模块+循环操作,实现linux主机密码批量修改.注意:本脚本是在实验环境下生成的,如果线上使用,请先进性多次测试,防止出现问题.+++++++++++++++++++++++++++++++++++++++++++1. 包含文件1.1 脚本文件

PXE项目实战,通过编写脚本自动安装系统时批量部署服务器所需要的所有服务

由于上一篇是有关PXE的基础配置,所以有些过程省略,有不便的还请谅解. 案例需求:    1. 在局域网 192.168.1.0/24 内部署一台 PXE 装机服务器(CentOS 6.5系统)        操作系统:CentOS 6.5      ==> 关闭 iptables 防火墙.关闭SELinux机制,进图形桌面        网卡连接:VMnet1    ==> 关闭 NetworkManager 服务 2. 提供带图片背景.可选择的PXE启动菜单,支持为客户机裸机装配64位的