使用Python调用Nessus 接口实现自动化扫描

目录

  • 认证
  • 策略模板配置
    • 创建策略模板
  • 任务
    • 创建任务
    • 启动/停止任务
  • 获取扫描结果
    • 获取漏洞信息
    • 获取漏洞输出信息与漏洞知识库信息
  • 最后总结

@(目录)

之前在项目中需要接入nessus扫描器,研究了一下nessus的api,现在将自己的成果分享出来。
Nessus提供了丰富的二次开发接口,无论是接入其他系统还是自己实现自动化扫描,都十分方便。
同时Nessus也提供了完备的API文档,可以在 Settings->My Account->API Keys->API documentation

认证

nessus提供两种认证方式,第一种采用常规的登录后获取token的方式,在https://localhost:8834/api#/resources/session条目中可以找到这种方式,它的接口定义如下:

POST /session

{
    "username":{string},
    "password":{string}
}

输入正确的用户名和密码,登录成功后会返回一个token
···
{
"token": {string}
}
···

在后续请求中,将token放入请求头信息中请求头的key为X-Cookie,值为 token=xxxx,例如 :X-Cookie: token=5fa3d3fd97edcf40a41bb4dbdfd0b470ba45dde04ebc37f8;,下面是获取任务列表的例子

import requests
import json
def get_token(ip, port, username, password):
    url = "https://{0}:{1}/session".format(ip, port)
    post_data = {
        'username': username,
        'password': password
    }

    respon = requests.post(url, data=post_data, verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        return data["token"]

def get_scan_list()
    # 这里ip和port可以从配置文件中读取或者从数据库读取,这里我省略了获取这些配置值得操作
    url = "https://{ip}:{port}/scans".format(ip, port)
    token = get_token(ip, port, username, password)
    if token:
        header = {
            "X-Cookie":"token={0}".format(token),
            "Content-Type":"application/json"
        }
        response = requests.get(url, headers=header, verify=False)
        if response.status_code == 200:
            result = json.loads(respon.text)
            return result

第二种方式是使用Nessus生成的API Key,这里我们可以依次点击 Settings->My Account->API Keys-->Generate按钮,生成一个key,后续使用时填入头信息中,还是以获取扫描任务列表作为例子

def get_scan_list()
    accessKey = "XXXXXX" #此处填入真实的内容
    secretKey = "XXXXXX" #此处填入真实内容

    url = "https://{ip}:{port}/scans".format(ip, port)
    token = get_token(ip, port, username, password)
    if token:
        header = {
            'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accessKey, secretkey=secretKey)
            "Content-Type":"application/json"
        }
        response = requests.get(url, headers=header, verify=False)
        if response.status_code == 200:
            result = json.loads(respon.text)
            return result

对比来看使用第二种明显方便一些,因此后续例子都采用第二种方式来呈现

策略模板配置

策略模板的接口文档在 https://localhost:8834/api#/resources/policies 中。

创建策略模板

创建策略模板使用的是 策略模板的create接口,它里面有一个必须填写的参数 uuid 这个参数是一个uuid值,表示以哪种现有模板进行创建。在创建之前需要先获取系统中可用的模板。获取的接口是 /editor/{type}/templates,type 可以选填policy或者scan。这里我们填policy

一般我们都是使用模板中的 Advanced 来创建,如下图

下面是获取该模板uuid的方法,主要思路是获取系统中所有模板,然后根据模板名称返回对应的uuid值

def get_nessus_template_uuid(ip, port, template_name = "advanced"):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/editor/scan/templates".format(ip=ip, port=port)
    response = requests.get(api, headers=header, verify=False)
    templates = json.loads(response.text)['templates']

    for template in templates:
        if template['name'] == template_name:
            return template['uuid']
    return None

有了这个id之后,下面来创建策略模板,这个接口的参数较多,但是很多参数都是选填项。

这个部分文档写的很简陋,很多参数不知道是干嘛用的,当时我为了搞清楚每个参数的作用,一个个的尝试,然后去界面上看它的效果,最后终于把我感兴趣的给弄明白了。
它的主体部分如下:
json { "uuid": {template_uuid}, "audits": { "feed": { "add": [ { "id": {audit_id}, "variables": { "1": {audit_variable_value}, "2": {audit_variable_value}, "3": {audit_variable_value} } } ] } }, "credentials": { "add": { {credential_category}: { {credential_name}: [ { {credential_input_name}: {string} } ] } } }, "plugins": { {plugin_family_name}: { "status": {string}, "individual": { {plugin_id}: {string} } } }, "scap": { "add": { {scap_category}: [ { {scap_input_name}: {string} } ] } }, "settings": { "acls": [ permission Resource ], //其他的减值对,这里我将他们都省略了 }
他们与界面上配置的几个大项有对应关系,能对应的上的我给做了标记,但是有的部分对应不上。

settings 是给策略模板做基础配置的,包括配置扫描的端口范围,服务检测范围等等。
credentials 是配置登录扫描的,主要包括 windows、ssh、telnet等等
plugins 配置扫描使用的插件,例如服务扫描版本漏洞等等

在settings中,对应关系如下图所示



下面是创建扫描策略模板的实际例子:

def create_template(ip, port, **kwargs): # kwargs 作为可选参数,用来配置settings和其他项
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }
    policys = {}

    # 这里 grouppolicy_set 存储的是策略模板中各个脚本名称以及脚本是否启用的信息
    for policy in grouppolicy_set:
        enabled = "enabled" if policy.enable else "disabled"
        policys[policy.name] = {
            "status": enabled
        }

    # settings里面的各小项必须得带上,否则会创建不成功
    "settings": {
        "name": template.name,
        "watchguard_offline_configs": "",
        "unixfileanalysis_disable_xdev": "no",
        "unixfileanalysis_include_paths": "",
        "unixfileanalysis_exclude_paths": "",
        "unixfileanalysis_file_extensions": "",
        "unixfileanalysis_max_size": "",
        "unixfileanalysis_max_cumulative_size": "",
        "unixfileanalysis_max_depth": "",
        "unix_docker_scan_scope": "host",
        "sonicos_offline_configs": "",
        "netapp_offline_configs": "",
        "junos_offline_configs": "",
        "huawei_offline_configs": "",
        "procurve_offline_configs": "",
        "procurve_config_to_audit": "Saved/(show config)",
        "fortios_offline_configs": "",
        "fireeye_offline_configs": "",
        "extremeos_offline_configs": "",
        "dell_f10_offline_configs": "",
        "cisco_offline_configs": "",
        "cisco_config_to_audit": "Saved/(show config)",
        "checkpoint_gaia_offline_configs": "",
        "brocade_offline_configs": "",
        "bluecoat_proxysg_offline_configs": "",
        "arista_offline_configs": "",
        "alcatel_timos_offline_configs": "",
        "adtran_aos_offline_configs": "",
        "patch_audit_over_telnet": "no",
        "patch_audit_over_rsh": "no",
        "patch_audit_over_rexec": "no",
        "snmp_port": "161",
        "additional_snmp_port1": "161",
        "additional_snmp_port2": "161",
        "additional_snmp_port3": "161",
        "http_login_method": "POST",
        "http_reauth_delay": "",
        "http_login_max_redir": "0",
        "http_login_invert_auth_regex": "no",
        "http_login_auth_regex_on_headers": "no",
        "http_login_auth_regex_nocase": "no",
        "never_send_win_creds_in_the_clear": "yes" if kwargs["never_send_win_creds_in_the_clear"] else "no",
        "dont_use_ntlmv1": "yes" if kwargs["dont_use_ntlmv1"] else "no",
        "start_remote_registry": "yes" if kwargs["start_remote_registry"] else "no",
        "enable_admin_shares": "yes" if kwargs["enable_admin_shares"] else "no",
        "ssh_known_hosts": "",
        "ssh_port": kwargs["ssh_port"],
        "ssh_client_banner": "OpenSSH_5.0",
        "attempt_least_privilege": "no",
        "region_dfw_pref_name": "yes",
        "region_ord_pref_name": "yes",
        "region_iad_pref_name": "yes",
        "region_lon_pref_name": "yes",
        "region_syd_pref_name": "yes",
        "region_hkg_pref_name": "yes",
        "microsoft_azure_subscriptions_ids": "",
        "aws_ui_region_type": "Rest of the World",
        "aws_us_east_1": "",
        "aws_us_east_2": "",
        "aws_us_west_1": "",
        "aws_us_west_2": "",
        "aws_ca_central_1": "",
        "aws_eu_west_1": "",
        "aws_eu_west_2": "",
        "aws_eu_west_3": "",
        "aws_eu_central_1": "",
        "aws_eu_north_1": "",
        "aws_ap_east_1": "",
        "aws_ap_northeast_1": "",
        "aws_ap_northeast_2": "",
        "aws_ap_northeast_3": "",
        "aws_ap_southeast_1": "",
        "aws_ap_southeast_2": "",
        "aws_ap_south_1": "",
        "aws_me_south_1": "",
        "aws_sa_east_1": "",
        "aws_use_https": "yes",
        "aws_verify_ssl": "yes",
        "log_whole_attack": "no",
        "enable_plugin_debugging": "no",
        "audit_trail": "use_scanner_default",
        "include_kb": "use_scanner_default",
        "enable_plugin_list": "no",
        "custom_find_filepath_exclusions": "",
        "custom_find_filesystem_exclusions": "",
        "reduce_connections_on_congestion": "no",
        "network_receive_timeout": "5",
        "max_checks_per_host": "5",
        "max_hosts_per_scan": "100",
        "max_simult_tcp_sessions_per_host": "",
        "max_simult_tcp_sessions_per_scan": "",
        "safe_checks": "yes",
        "stop_scan_on_disconnect": "no",
        "slice_network_addresses": "no",
        "allow_post_scan_editing": "yes",
        "reverse_lookup": "no",
        "log_live_hosts": "no",
        "display_unreachable_hosts": "no",
        "report_verbosity": "Normal",
        "report_superseded_patches": "yes",
        "silent_dependencies": "yes",
        "scan_malware": "no",
        "samr_enumeration": "yes",
        "adsi_query": "yes",
        "wmi_query": "yes",
        "rid_brute_forcing": "no",
        "request_windows_domain_info": "no",
        "scan_webapps": "no",
        "start_cotp_tsap": "8",
        "stop_cotp_tsap": "8",
        "modbus_start_reg": "0",
        "modbus_end_reg": "16",
        "hydra_always_enable": "yes" if kwargs["hydra_always_enable"] else "no",
        "hydra_logins_file": "" if kwargs["hydra_logins_file"] else kwargs["hydra_logins_file"], # 弱口令文件需要事先上传,后面会提到上传文件接口
        "hydra_passwords_file": "" if kwargs["hydra_passwords_file"] else kwargs["hydra_passwords_file"],
        "hydra_parallel_tasks": "16",
        "hydra_timeout": "30",
        "hydra_empty_passwords": "yes",
        "hydra_login_as_pw": "yes",
        "hydra_exit_on_success": "no",
        "hydra_add_other_accounts": "yes",
        "hydra_postgresql_db_name": "",
        "hydra_client_id": "",
        "hydra_win_account_type": "Local accounts",
        "hydra_win_pw_as_hash": "no",
        "hydra_cisco_logon_pw": "",
        "hydra_web_page": "",
        "hydra_proxy_test_site": "",
        "hydra_ldap_dn": "",
        "test_default_oracle_accounts": "no",
        "provided_creds_only": "yes",
        "smtp_domain": "example.com",
        "smtp_from": "[email protected]",
        "smtp_to": "[email protected][AUTO_REPLACED_IP]",
        "av_grace_period": "0",
        "report_paranoia": "Normal",
        "thorough_tests": "no",
        "detect_ssl": "yes",
        "tcp_scanner": "no",
        "tcp_firewall_detection": "Automatic (normal)",
        "syn_scanner": "yes",
        "syn_firewall_detection": "Automatic (normal)",
        "wol_mac_addresses": "",
        "wol_wait_time": "5",
        "scan_network_printers": "no",
        "scan_netware_hosts": "no",
        "scan_ot_devices": "no",
        "ping_the_remote_host": "yes",
        "tcp_ping": "yes",
        "icmp_unreach_means_host_down": "no",
        "test_local_nessus_host": "yes",
        "fast_network_discovery": "no",

        "arp_ping": "yes" if kwargs["arp_ping"] else "no",
        "tcp_ping_dest_ports": kwargs["tcp_ping_dest_ports"],
        "icmp_ping": "yes" if kwargs["icmp_ping"] else "no",
        "icmp_ping_retries": kwargs["icmp_ping_retries"],
        "udp_ping": "yes" if kwargs["udp_ping"] else "no",
        "unscanned_closed": "yes" if kwargs["unscanned_closed"] else "no",
        "portscan_range": kwargs["portscan_range"],
        "ssh_netstat_scanner": "yes" if kwargs["ssh_netstat_scanner"] else "no",
        "wmi_netstat_scanner": "yes" if kwargs["wmi_netstat_scanner"] else "no",
        "snmp_scanner": "yes" if kwargs["snmp_scanner"] else "no",
        "only_portscan_if_enum_failed": "yes" if kwargs["only_portscan_if_enum_failed"] else "no",
        "verify_open_ports": "yes" if kwargs["verify_open_ports"] else "no",
        "udp_scanner": "yes" if kwargs["udp_scanner"] else "no",
        "svc_detection_on_all_ports": "yes" if kwargs["svc_detection_on_all_ports"] else "no",
        "ssl_prob_ports": "Known SSL ports" if kwargs["ssl_prob_ports"] else "All ports",
        "cert_expiry_warning_days": kwargs["cert_expiry_warning_days"],
        "enumerate_all_ciphers": "yes" if kwargs["enumerate_all_ciphers"] else "no",
        "check_crl": "yes" if kwargs["check_crl"] else "no",
   }

    credentials = {
            "add": {
                "Host": {
                    "SSH": [],
                    "SNMPv3": [],
                    "Windows": [],
                },
                "Plaintext Authentication": {
                    "telnet/rsh/rexec": []
                }
            }
        }
        try:
            if kwargs["snmpv3_username"] and kwargs["snmpv3_port"] and kwargs["snmpv3_level"]:
                level = kwargs["snmpv3_level"]
                if level == NessusSettings.LOW:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "No authentication and no privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"]
                    })
                elif level == NessusSettings.MID:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "Authentication without privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"],
                        "auth_algorithm": NessusSettings.AUTH_ALG[kwargs["snmpv3_auth"][1]],
                        "auth_password": kwargs["snmpv3_auth_psd"]
                    })
                elif level == NessusSettings.HIGH:
                    credentials["add"]["Host"]["SNMPv3"].append({
                        "security_level": "Authentication and privacy",
                        "username": kwargs["snmpv3_username"],
                        "port": kwargs["snmpv3_port"],
                        "auth_algorithm": NessusSettings.AUTH_ALG[kwargs["snmpv3_auth"]][1],
                        "auth_password": kwargs["snmpv3_auth_psd"],
                        "privacy_algorithm": NessusSettings.PPIVACY_ALG[kwargs["snmpv3_hide"]][1],
                        "privacy_password": kwargs["snmpv3_hide_psd"]
                    })

            if kwargs["ssh_username"] and kwargs["ssh_psd"]:
                credentials["add"]["Host"]["SSH"].append(
                    {
                        "auth_method": "password",
                        "username": kwargs["ssh_username"],
                        "password": kwargs["ssh_psd"],
                        "elevate_privileges_with": "Nothing",
                        "custom_password_prompt": "",
                    })

            if kwargs["windows_username"] and kwargs["windows_psd"]:
                credentials["add"]["Host"]["Windows"].append({
                    "auth_method": "Password",
                    "username": kwargs["windows_username"],
                    "password": kwargs["windows_psd"],
                    "domain": kwargs["ssh_host"]
                })

            if kwargs["telnet_username"] and kwargs["telnet_password"]:
                credentials["add"]["Plaintext Authentication"]["telnet/rsh/rexec"].append({
                    "username": kwargs["telnet_username"],
                    "password": kwargs["telnet_password"]
                })

    data = {
            "uuid": get_nessus_template_uuid(terminal, "advanced"),
            "settings": settings,
            "plugins": policys,
            "credentials": credentials
        }

    api = "https://{0}:{1}/policies".format(ip, port)
    response = requests.post(api, headers=header, data=json.dumps(data, ensure_ascii=False).encode("utf-8"), # 这里做一个转码防止在nessus端发生中文乱码
                             verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        return data["policy_id"] # 返回策略模板的id,后续可以在创建任务时使用
    else:
        return None

策略还有copy、delete、config等操作,这里就不再介绍了,这个部分主要弄清楚各参数的作用,后面的这些接口使用的参数都是一样的

任务

任务部分的API 在https://localhost:8834/api#/resources/scans 中

创建任务

创建任务重要的参数如下说明如下:

  1. uuid: 创建任务时使用的模板id,这个id同样是我们上面说的系统自带的模板id
  2. name:任务名称
  3. policy_id:策略模板ID,这个是可选的,如果要使用上面我们自己定义的扫描模板,需要使用这个参数来指定,并且设置上面的uuid为 custom 的uuid,这个值表示使用用户自定义模板;当然如果就想使用系统提供的,这个字段可以不填
  4. text_targets:扫描目标地址,这个参数是一个数组,可以填入多个目标地址,用来一次扫描多个主机

创建任务的例子如下:

def create_task(task_name, policy_id, hosts): # host 是一个列表,存放的是需要扫描的多台主机
    uuid = get_nessus_template_uuid(terminal, "custom") # 获取自定义策略的uuid
    if uuid is None:
        return False

    data = {"uuid": uuid, "settings": {
        "name": name,
        "policy_id": policy_id,
        "enabled": True,
        "text_targets": hosts,
        "agent_group_id": []
    }}

    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/scans".format(ip=terminal.ip, port=terminal.port)
    response = requests.post(api, headers=header, data=json.dumps(data, ensure_ascii=False).encode("utf-8"),
                             verify=False)
    if response.status_code == 200:
        data = json.loads(response.text)
        if data["scan"] is not None:
            scan = data["scan"]
            # 新增任务扩展信息记录

            return scan["id"] # 返回任务id

启动/停止任务

启动任务的接口为 POST /scans/{scan_id}/launch scan_id 是上面创建任务返回的任务ID, 它有个可选参数 alt_targets,如果这个参数被指定,那么该任务可以扫描这个参数中指定的主机,而之前创建任务时指定的主机将被替代

停止任务的接口为: POST /scans/{scan_id}/stop
下面给出启动和停止任务的方法

def start_task(task_id, hosts):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    data = {
        "alt_targets": [hosts] # 重新指定扫描地址
    }

    api = "https://{ip}:{port}/scans/{scan_id}/launch".format(ip=ip, port=port, scan_id=scan_id)
    response = requests.post(api, data=data, verify=False, headers=header)
    if response.status_code != 200:
        return False
    else:
        return True

def stop_task(task_id):
    header = {
        'X-ApiKeys': 'accessKey={accesskey};secretKey={secretkey}'.format(accesskey=terminal.reserved1,
                                                                          secretkey=terminal.reserved2),
        'Content-type': 'application/json',
        'Accept': 'text/plain'}

    api = "https://{ip}:{port}/scans/{scan_id}/stop".format(ip=ip, port=port, task_id)
    response = requests.post(api, headers=header, verify=False)
    if response.status_code == 200 or response.status_code == 409: # 根据nessus api文档可以知道409 表示任务已结束
        return True

    return False

获取扫描结果

使用接口 GET /scans/{scan_id} 可以获取最近一次扫描的任务信息,从接口文档上看,它还可以获取某次历史扫描记录的信息,如果不填这个参数,接口中会返回所有历史记录的id。如果不填历史记录id,那么会返回最近一次扫描到的漏洞信息,也就是说新扫描到的信息会把之前的信息给覆盖

下面是返回信息的部分说明

{
    "info": {
        "edit_allowed": {boolean},
        "status": {string}, //当前状态 completed 字符串表示结束,cancel表示停止
        "policy": {string},
        "pci-can-upload": {boolean},
        "hasaudittrail": {boolean},
        "scan_start": {string},
        "folder_id": {integer},
        "targets": {string},
        "timestamp": {integer},
        "object_id": {integer},
        "scanner_name": {string},
        "haskb": {boolean},
        "uuid": {string},
        "hostcount": {integer},
        "scan_end": {string},
        "name": {string},
        "user_permissions": {integer},
        "control": {boolean}
    },
    "hosts": [ //按主机区分的漏洞信息
        host Resource
    ],
    "comphosts": [
        host Resource
    ],
    "notes": [
        note Resource
    ],
    "remediations": {
        "remediations": [
            remediation Resource
        ],
        "num_hosts": {integer},
        "num_cves": {integer},
        "num_impacted_hosts": {integer},
        "num_remediated_cves": {integer}
    },
    "vulnerabilities": [
        vulnerability Resource //本次任务扫描到的漏洞信息
    ],
    "compliance": [
        vulnerability Resource
    ],
    "history": [
        history Resource //历史扫描信息,可以从这个信息中获取历史记录的id
    ],
    "filters": [
        filter Resource
    ]
}

这个信息里面vulnerabilities和host里面都可以拿到漏洞信息,但是 vulnerabilities中是扫描到的所有漏洞信息,而host则需要根据id再次提交请求,也就是需要额外一次请求,但它是按照主机对扫描到的漏洞进行了分类。而使用vulnerabilities则需要根据漏洞信息中的host_id 手工进行分类

下面是获取任务状态的示例:

def get_task_status(task_id):
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    api = "https://{ip}:{port}/scans/{task_id}".format(ip=ip, port=port,
                                                       task_id=task_id)
    response = requests.get(api, headers=header, verify=False)
    if response.status_code != 200:
        return 2, "Data Error"

    data = json.loads(response.text)
    hosts = data["hosts"]
    for host in hosts:
        get_host_vulnerabilities(scan_id, host["host_id"]) # 按主机获取漏洞信息

    if data["info"]["status"] == "completed" or data["info"]["status"] =='canceled':
        # 已完成,此时更新本地任务状态
    return 1, "OK"

获取漏洞信息

在获取任务信息中,已经得到了本次扫描中发现的弱点信息了,只需要我们解析这个json。它具体的内容如下:

"host_id": {integer}, //主机id
"host_index": {string},
"hostname": {integer},//主机名称
"progress": {string}, //扫描进度
"critical": {integer}, //危急漏洞数
"high": {integer}, //高危漏洞数
"medium": {integer}, //中危漏洞数
"low": {integer}, //低危漏洞数
"info": {integer}, //相关信息数目
"totalchecksconsidered": {integer},
"numchecksconsidered": {integer},
"scanprogresstotal": {integer},
"scanprogresscurrent": {integer},
"score": {integer}

根据主机ID可以使用 GET /scans/{scan_id}/hosts/{host_id} 接口获取主机信息,它需要两个参数,一个是扫描任务id,另一个是主机id。
下面列举出来的是返回值得部分内容,只列举了我们感兴趣的部分:

{
    "info": {
        "host_start": {string},
        "mac-address": {string},
        "host-fqdn": {string},
        "host_end": {string},
        "operating-system": {string},
        "host-ip": {string}
    },

    "vulnerabilities": [
        {
            "host_id": {integer}, //主机id
            "hostname": {string}, //主机名称
            "plugin_id": {integer}, //策略id
            "plugin_name": {string}, //策略名称
            "plugin_family": {string}, //所属策略组
            "count": {integer}, //该种漏洞数
            "vuln_index": {integer},
            "severity_index": {integer},
            "severity": {integer}
        }
    ],
}

根据上面获取任务信息中得到的主机id和任务id,我们可以实现这个功能

def get_host_vulnerabilities(scan_id, host_id):
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    scan_history = ScanHistory.objects.get(id=scan_id)
    api = "https://{ip}:{port}/scans/{task_id}/hosts/{host_id}".format(ip=ip, port=port, task_id=scan_id, host_id=host_id)
    response = requests.get(api, headers=header, verify=False)
    if response.status_code != 200:
        return 2, "Data Error"

    data = json.loads(response.text)
    vulns = data["vulnerabilities"]
    for vuln in vulns:
        vuln_name = vuln["plugin_name"]
        plugin_id = vuln["plugin_id"] #插件id,可以获取更详细信息,包括插件自身信息和扫描到漏洞的解决方案等信息
        #保存漏洞信息

获取漏洞输出信息与漏洞知识库信息

我们在nessus web页面中可以看到每条被检测到的漏洞在展示时会有输出信息和知识库信息,这些信息也可以根据接口来获取

获取漏洞的知识库可以通过接口 GET /scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id} , 它的路径为: https://localhost:8834/api#/resources/scans/plugin-output

它返回的值如下:

{
    "info": {
        "plugindescription": {
            "severity": {integer}, //危险等级,从info到最后的critical依次为1,2,3,4,5
            "pluginname": {string},
            "pluginattributes": {
                "risk_information": {
                    "risk_factor": {string}
                },
                "plugin_name": {string}, //插件名称
                "plugin_information": {
                    "plugin_id": {integer},
                    "plugin_type": {string},
                    "plugin_family": {string},
                    "plugin_modification_date": {string}
                },
                "solution": {string}, //漏洞解决方案
                "fname": {string},
                "synopsis": {string},
                "description": {string} //漏洞描述
            },
            "pluginfamily": {string},
            "pluginid": {integer}
        }
    },
    "output": [
        plugin_output:{
           "plugin_output": {string}, //输出信息
           "hosts": {string}, //主机信息
           "severity": {integer},
           "ports": {} //端口信息
        }
    ]
}

有了这些信息,我们可以通过下面的代码获取这些信息:

def get_vuln_detail(scan_id, host_id, plugin_id)
    header = {
        "X-ApiKeys": "accessKey={accesskey};secretKey={secretkey}".format(accesskey=accesskey,
                                                                          secretkey=secretkey),
        "Content-Type": "application/json",
        "Accept": "text/plain"
    }

    api = "https://{ip}:{port}/scans/{scan_id}/hosts/{host_id}/plugins/{plugin_id}".format(ip=ip, port=port, scan_id=scan_id, host_id=host_id, plugin_id=plugin_id)
    response = requests.get(api, headers=header, verify=False)
    data = json.loads(response.text)
    outputs = data["outputs"]
    return outputs

最后总结

这篇文章我们主要介绍了nessus API从扫描设置到扫描任务创建、启动、停止、以及结果的获取的内容,当然nessus的api不止这些,但是最重要的应该是这些,如果能帮助各位解决手头上的问题自然是极好的,如果不能或者说各位朋友需要更细致的控制,可以使用浏览器抓包的方式来分析它的请求和响应包。
在摸索时最好的两个帮手是浏览器 F12工具栏中的 network和nessus api文档页面上的test工具了。

我们可以先按 f12 打开工具并切换到network,然后在页面上执行相关操作,观察发包即可发现该如何使用这些API,因为Nessus Web端在操作时也是使用API。如下图:

或者可以使用文档中的test工具,例如下面是测试 获取插件输出信息的接口

原文地址:https://www.cnblogs.com/lanuage/p/12183229.html

时间: 2024-10-24 05:52:51

使用Python调用Nessus 接口实现自动化扫描的相关文章

Python调用webservice接口

python调用webservice接口,这里我们使用suds包 如果没有直接cmd命令安装subs包 pip suds install 安装成功会有提示成功字样. ------------------------------ 首先导入包 先查看一下webservice提供哪些类型及方法,之后我们才知道怎么调用,传哪些参数. from suds.client import Client from suds.transport.https import HttpAuthenticated #web

python调用ice接口

今天用python调用ice接口,遇到如下提示 ImportError: No module named Ice 解决方案是 set PYTHONPATH=C:\Program Files\ZeroC\Ice-3.4.1\python 然后又提示 import IcePy ImportError: DLL load failed: The specified module could not be found. 在网上看到别人是如下配置即可 C:\> set PATH=C:\Python26;C

编写python调用dubbo接口hessian协议的例子

引子 今天有小伙伴问到了怎么用python调用dubbo的接口的方法,就随便写了这么一篇文章.其实dubbo接口可以使用loadrunner.jmeter等完成,最好是熟悉java语言的,那么编写起来就丝滑了很多哦 那么用python来调用其实也是很简单的,并不像大家想的那么复杂,基本3.4步就可以搞定,不要急,来看如何实现 接口说明 既然做接口测试,那接口的说明是必须的,问开发GG要,不要问从哪里来....大致包括如下内容: 接口地址 http://192.168.133.129:20880/

Python调用API接口的几种方式

相信做过自动化运维的同学都用过API接口来完成某些动作.API是一套成熟系统所必需的接口,可以被其他系统或脚本来调用,这也是自动化运维的必修课. 本文主要介绍python中调用API的几种方式,下面是python中会用到的库. - urllib2 - httplib2 - pycurl - requests urllib2 import urllib2, urllib github_url = 'https://api.github.com/user/repos' password_manage

Python的Web编程[2] -> WebService技术[0] -> 利用 Python 调用 WebService 接口

WebService技术 / WebService Technology 1 关于webservice / Constants WebService是一种跨编程语言和跨操作系统平台的远程调用技术. WebService主要由以下三种技术构成,XML+XSD,SOAP和WSDL XML+XSD: WebService采用HTTP协议传输数据,采用XML格式封装数据(即XML中说明调用远程服务对象的哪个方法,传递的参数是什么,以及服务对象的返回结果是什么).XML是WebService平台中表示数据

Python调用百度接口(情感倾向分析)和讯飞接口(语音识别、关键词提取)处理音频文件

本示例的过程是: 1. 音频转文本 2. 利用文本获取情感倾向分析结果 3. 利用文本获取关键词提取 首先是讯飞的语音识别模块.在这里可以找到非实时语音转写的相关文档以及 Python 示例.我略作了改动,让它可以对不同人说话作区分,并且作了一些封装. 语音识别功能 weblfasr_python3_demo.py 文件: 1 #!/usr/bin/env python 2 # -*- coding: utf-8 -*- 3 """ 4 讯飞非实时转写调用demo(语音识别)

python调用webservice接口之多参数传递

from suds.client import Client def func(url, operation, *args): '''接口调用''' client = Client(url) result = eval("client.service.%s" % operation)(*args)  #eval将字符串转换成变量使用 return result url = 'http://10.***.**.***:****/DaiDaiKan/DaiDaiKanService.asm

【Python】使用Python调用Rest接口

用SpringBoot写了一个几个简单的Rest接口,对一个计数器进行查询,设置和增加,具体可以看: https://www.cnblogs.com/wuyizuokan/p/11117294.html 废话不多说,直接上Python代码: 1 # -*- coding: utf-8 -*- 2 import json 3 4 import requests 5 6 REQUEST_URL = "http://localhost:8080/me/count" 7 HEADER = {'

python调用win32接口进行截图

python的版本及依赖的库的安装 #版本python 3.7.1 pip install pywin32==224 pip install numpy==1.15.3 pip install opencv-python==3.4.2.16 pip install opencv-contrib-python==3.4.2.16 pip install Pillow-PIL==0.1.dev0 对后台窗口截图 #对后台窗口截图 import win32gui, win32ui, win32con