OSSIM传感器中Agent传送机制初探

在OSSIM传感器在通过GET框架实现OSSIM代理和OSSIM服务器之间通信协议和数据格式的转换。下面我们简要看一下ossim-agent脚本:
#!/usr/bin/python -OOt
import sys
sys.path.append(‘/usr/share/ossim-agent/‘)
sys.path.append(‘/usr/local/share/ossim-agent/‘)
from ossim_agent.Agent import Agent
agent = Agent()
agent.main()
这里需要GET作为OSSIM代理向OSSIM服务器输送数据。实现紧密整合所需的两个主要操作是“生成”(或)OSSIM兼容事件的“映射Mapping”)和此类数据向OSSIM的“传输”服务器。它负责此类操作的GET框架的两个组件是EventHandler和Sender Agent,如图1所示。

图1 将Get框架内容集成到OSSIM

Event Handler的主要任务是映射数据源插件采集的事件到SIEM实例警报的OSSIM标准化事件格式。为了执行这样的过程,原始消息经历由RAW LOG转换为现有归一化数据字段格式的一个转变;在上图中我们将这些机制表示为“归一化Normalization”和“OSSIM消息”。部分日志归一化代码:
from Logger import Logger
from time import mktime, strptime
logger = Logger.logger
class Event:
EVENT_TYPE = ‘event‘
EVENT_ATTRS = [
"type",
"date",
"sensor",
"interface",
"plugin_id",
"plugin_sid",
"priority",
"protocol",
"src_ip",
"src_port",
"dst_ip",
"dst_port",
"username",
"password",
"filename",
"userdata1",
"userdata2",
"userdata3",
"userdata4",
"userdata5",
"userdata6",
"userdata7",
"userdata8",
"userdata9",
"occurrences",
"log",
"data",
"snort_sid", # snort specific
"snort_cid", # snort specific
"fdate",
"tzone"
]

def __init__(self):
    self.event = {}
    self.event["event_type"] = self.EVENT_TYPE

def __setitem__(self, key, value):

    if key in self.EVENT_ATTRS:
        self.event[key] = self.sanitize_value(value)
        if key == "date":
            # The date in seconds anf fdate as string
            self.event["fdate"]=self.event[key]
            try:
                self.event["date"]=int(mktime(strptime(self.event[key],"%Y-%m-%d %H:%M:%S")))
            except:
                logger.warning("There was an error parsing date (%s)" %                    (self.event[key]))
    elif key != ‘event_type‘:
        logger.warning("Bad event attribute: %s" % (key))

def __getitem__(self, key):
    return self.event.get(key, None)
# 事件表示
def __repr__(self):
    event = self.EVENT_TYPE
    for attr in self.EVENT_ATTRS:
        if self[attr]:
            event += ‘ %s="%s"‘ % (attr, self[attr])
    return event + "\n"
# 返回内部哈希值
def dict(self):
    return self.event

def sanitize_value(self, string):
    return str(string).strip().replace("\"", "\\\"").replace("‘", "")

class EventOS(Event):
EVENT_TYPE = ‘host-os-event‘
EVENT_ATTRS = [
"host",
"os",
"sensor",
"interface",
"date",
"plugin_id",
"plugin_sid",
"occurrences",
"log",
"fdate",
]

class EventMac(Event):
EVENT_TYPE = ‘host-mac-event‘
EVENT_ATTRS = [
"host",
"mac",
"vendor",
"sensor",
"interface",
"date",
"plugin_id",
"plugin_sid",
"occurrences",
"log",
"fdate",
]

class EventService(Event):
EVENT_TYPE = ‘host-service-event‘
EVENT_ATTRS = [
"host",
"sensor",
"interface",
"port",
"protocol",
"service",
"application",
"date",
"plugin_id",
"plugin_sid",
"occurrences",
"log",
"fdate",
]

class EventHids(Event):
EVENT_TYPE = ‘host-ids-event‘
EVENT_ATTRS = [
"host",
"hostname",
"hids_event_type",
"target",
"what",
"extra_data",
"sensor",
"date",
"plugin_id",
"plugin_sid",
"username",
"password",
"filename",
"userdata1",
"userdata2",
"userdata3",
"userdata4",
"userdata5",
"userdata6",
"userdata7",
"userdata8",
"userdata9",
"occurrences",
"log",
"fdate",
]

class WatchRule(Event):

EVENT_TYPE = ‘event‘
EVENT_ATTRS = [
    "type",
"date",
"fdate",
"sensor",
"interface",
"src_ip",
"dst_ip",
"protocol",
    "plugin_id",
    "plugin_sid",
    "condition",
    "value",
    "port_from",
    "src_port",
    "port_to",
    "dst_port",
    "interval",
    "from",
    "to",
    "absolute",
"log",
    "userdata1",
    "userdata2",
    "userdata3",
    "userdata4",
    "userdata5",
    "userdata6",
    "userdata7",
    "userdata8",
    "userdata9",
    "filename",
    "username",
]

class Snort(Event):
EVENT_TYPE = ‘snort-event‘
EVENT_ATTRS = [
"sensor",
"interface",
"gzipdata",
"unziplen",
"event_type",
"plugin_id",
"type",
"occurrences"
]
日志编码代码:
import threading, time
from Logger import Logger
logger = Logger.logger
from Output import Output
import Config
import Event
from Threshold import EventConsolidation
from Stats import Stats
from ConnPro import ServerConnPro
class Detector(threading.Thread):
def init(self, conf, plugin, conn):

    self._conf = conf
    self._plugin = plugin
    self.os_hash = {}
    self.conn = conn
    self.consolidation = EventConsolidation(self._conf)
    logger.info("Starting detector %s (%s).." %                 (self._plugin.get("config", "name"),
                 self._plugin.get("config", "plugin_id")))
    threading.Thread.__init__(self)
def _event_os_cached(self, event):
    if isinstance(event, Event.EventOS):
        import string
        current_os = string.join(string.split(event["os"]), ‘ ‘)
        previous_os = self.os_hash.get(event["host"], ‘‘)
        if current_os == previous_os:
            return True
        else:
            # Fallthrough and add to cache
            self.os_hash[event["host"]] =                 string.join(string.split(event["os"]), ‘ ‘)
    return False
def _exclude_event(self, event):

    if self._plugin.has_option("config", "exclude_sids"):
        exclude_sids = self._plugin.get("config", "exclude_sids")
        if event["plugin_sid"] in Config.split_sids(exclude_sids):
            logger.debug("Excluding event with " +                "plugin_id=%s and plugin_sid=%s" %                (event["plugin_id"], event["plugin_sid"]))
            return True
    return False

def _thresholding(self):
    self.consolidation.process()
def _plugin_defaults(self, event):
    # 从配置文件中获取默认参数
    if self._conf.has_section("plugin-defaults"):

    # 1) 日期
        default_date_format = self._conf.get("plugin-defaults",
                                             "date_format")
        if event["date"] is None and default_date_format and            ‘date‘ in event.EVENT_ATTRS:
            event["date"] = time.strftime(default_date_format,
                                          time.localtime(time.time()))
    # 2) 传感器
        default_sensor = self._conf.get("plugin-defaults", "sensor")
        if event["sensor"] is None and default_sensor and            ‘sensor‘ in event.EVENT_ATTRS:
            event["sensor"] = default_sensor
    # 3) 网络接口
        default_iface = self._conf.get("plugin-defaults", "interface")
        if event["interface"] is None and default_iface and            ‘interface‘ in event.EVENT_ATTRS:
            event["interface"] = default_iface
    # 4) 源IP
        if event["src_ip"] is None and ‘src_ip‘ in event.EVENT_ATTRS:
            event["src_ip"] = event["sensor"]
    # 5) 时区
        default_tzone = self._conf.get("plugin-defaults", "tzone")
        if event["tzone"] is None and ‘tzone‘ in event.EVENT_ATTRS:
            event["tzone"] = default_tzone

    # 6) sensor,source ip and dest != localhost
        if event["sensor"] in (‘127.0.0.1‘, ‘127.0.1.1‘):
            event["sensor"] = default_sensor

        if event["dst_ip"] in (‘127.0.0.1‘, ‘127.0.1.1‘):
            event["dst_ip"] = default_sensor

        if event["src_ip"] in (‘127.0.0.1‘, ‘127.0.1.1‘):
            event["src_ip"] = default_sensor
    # 检测日志的类型
    if event["type"] is None and ‘type‘ in event.EVENT_ATTRS:
        event["type"] = ‘detector‘
    return event
def send_message(self, event):
    if self._event_os_cached(event):
        return

    if self._exclude_event(event):
        return
    #对于一些空属性使用默认值。
    event = self._plugin_defaults(event)

    # 合并之前检查
    if self.conn is not None:
        try:
            self.conn.send(str(event))
        except:
            id = self._plugin.get("config", "plugin_id")
            c = ServerConnPro(self._conf, id)
            self.conn = c.connect(0, 10)
            try:
                self.conn.send(str(event))
            except:
                return
        logger.info(str(event).rstrip())
    elif not self.consolidation.insert(event):
        Output.event(event)
    Stats.new_event(event)
def stop(self):
    #self.consolidation.clear()
    pass

#在子类中重写
def process(self):
pass
def run(self):
self.process()
class ParserSocket(Detector):
def process(self):
self.process()
class ParserDatabase(Detector):
def process(self):
self.process()
… …

从上可以看出,传感器的归一化主要负责对每个LOG内数据字段进行重新编码,使其生成一个全新的可能用于发送到OSSIM服务器的完整事件。为达成这种目的GET框架中包含了一些特定的功能,以便将所有的功能转换需要BASE64转换的字段。“OSSIM消息”负责填充GET生成的原始事件中不存在的字段。所以上面讲的plugin_id、plugin_sid是用来表示日志消息来源类型和子类型,这也是生成SIEM事件的必填字段。为事件格式完整性考虑,有些时候在无法确认源或目标IP时,系统默认会采用0.0.0.0来填充该字段。

注意:这种必填字段我们可利用phpmyadmin工具查看OSSIM的MySQL数据库。
Sender Agent负责完成下面两个任务:
发送由GET收集并由事件格式化的事件发送到OSSIM服务器,这项任务由Event Hander创建的消息组成消息队列发送到消息中间件实现,时序图如图2所示。

图2序列图:从安全探测器的日志转换到OSSIM服务器事件

2)管理GET框架和OSSIM服务器之间的通信,通信端口为TCP 40001通过双向握手实现。归一化原始日志是规范化过程的一个重要环节,OSSIM在归一化处理日志的同时保留了原始日志,可用于日志归档,提供了一种从规范化事件中提取原始日志的手段。
经过归一化处理的EVENTS,存储到MySQL数据库中,如图3所示。接着就由关联引擎根据规则、优先级、可靠性等参数进行交叉关联分析,得出风险值并发出各种报警提示信息。

图3 日志存储机制
接下来我们再看个实例,下面是一段Apache、CiscoASA以及SSH的原始日志,如图4、图5、图6所示。

图4Apache原始日志

图5 Cisco ASA 原始日志

图6 SSH原始日志
通过过OSSIM归一化处理后的实际再通过Web前端展现给大家方便阅读的格。归一化处理后的事件和原始日志的对比方法我们在《开源安全运维平台OSSIM疑难解析:入门篇》一书中还会讲解。

图7 归一化处理以后的Apache访问日志

在图7所示的例子当中,仅使用了Userdata1和Userdata2,并没有用到Userdata3~Userdata9这些是扩展位,主要是为了预留给其他设备或服务使用,这里目标地址会标记成IP地址的形式,例如:Host192.168.11.160。实际上归一化处理这种操作发生在系统采集和存储事件之后,关联和数据分析之前,在SIEM工具中把采集过程中把数据转换成易读懂的格式,采用格式化的数据,能更容易理解。

原文地址:https://blog.51cto.com/chenguang/2439193

时间: 2024-08-23 20:14:39

OSSIM传感器中Agent传送机制初探的相关文章

ActiveMQ讯息传送机制以及ACK机制

http://blog.csdn.net/lulongzhou_llz/article/details/42270113 ActiveMQ消息传送机制以及ACK机制详解 AcitveMQ是作为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的. 一. ActiveMQ消息传送机制 Producer客户端使用来发送消息的, Consumer客户端用来消费消息:它们的协同中心就是ActiveMQ br

走近OSSIM传感器(Sensor)插件

走近OSSIM传感器(Sensor)插件 在上一篇博文介绍完OSSIM架构何组成,接着要介绍它"神秘"的插件,阅读插件前提示您熟练掌握正则表达式. Sensor启用插件列表 [plugins] apache=/etc/ossim/agent/plugins/apache.cfg nmap-monitor=/etc/ossim/agent/plugins/nmap-monitor.cfg ossec-single-line=/etc/ossim/agent/plugins/ossec-s

浅谈Linux中的信号机制(二)

首先谢谢 @小尧弟 这位朋友对我昨天夜里写的一篇<浅谈Linux中的信号机制(一)>的指正,之前的题目我用的“浅析”一词,给人一种要剖析内核的感觉.本人自知功力不够,尚且不能对着Linux内核源码评头论足.以后的路还很长,我还是一步一个脚印的慢慢走着吧,Linux内核这座山,我才刚刚抵达山脚下. 好了,言归正传,我接着昨天写下去.如有错误还请各位看官指正,先此谢过. 上篇末尾,我们看到了这样的现象:send进程总共发送了500次SIGINT信号给rcv进程,但是实际过程中rcv只接受/处理了1

ffmpeg的内部Video Buffer管理和传送机制

ffmpeg的内部Video Buffer管理和传送机制 本文主要介绍ffmpeg解码器内部管理Video Buffer的原理和过程,ffmpeg的Videobuffer为内部管理,其流程大致为:注册处理函数->帧级释放->帧级申请->清空. 1 注册get_buffer()和release_buffer() FFAPI_InitCodec() avcodec_alloc_context() avcodec_alloc_context2() avcodec_get_context_def

java反射机制初探

反射,reflection,听其名就像照镜子一样,可以看见自己也可以看见别人的每一部分.在java语言中这是一个很重要的特性.下面是来自sun公司官网关于反射的介绍: Reflection is a feature in the Java programming language. It allows an executing Java program to examine or "introspect" upon itself, and manipulate internal pro

ActiveMQ讯息传送机制以及ACK机制详解

[http://www.ylzx8.cn/ruanjiangongcheng/software-architecture-design/11922.html] AcitveMQ:消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不仅要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的. [ActiveMQ消息传送机制]Producer客户端用来发送消息的, Consumer客户端用来消费消息:它们的协同中心就是ActiveMQ broker,broker也

SQL Server 内存中OLTP内部机制概述(四)

----------------------------我是分割线------------------------------- 本文翻译自微软白皮书<SQL Server In-Memory OLTP Internals Overview>:http://technet.microsoft.com/en-us/library/dn720242.aspx 译者水平有限,如有翻译不当之处,欢迎指正. ----------------------------我是分割线---------------

cocos2dx 3.2中的物理引擎初探(一)

cocos2dx在设计之初就集成了两套物理引擎,它们是box2d和chipmunk.我目前使用的是最新版的cocos2dx 3.2.引擎中默认使用的是chipmunk,如果想要改使用box2d的话,需要修改对应的android工程或者是ios工程的配置文件. 在2.x版本的cocos中,使用物理引擎的步骤十分繁琐.但在3.x版本中变得非常方便了.我这次的学习目标是制作一个打砖块的小游戏. 首先,现在的Scene类提供了一个静态工厂方法,用以创造一个集成物理引擎的场景. Scene::initWi

关于js中的回收机制,通俗版

在前面的几篇文章中,我讲解过了js中的回收机制,但是对于当时的我来说,我自己对回收机制的这个概念也有些懵懵懂懂,现在对回收机制有了更深入的理解,所以特此发布此文给于总结,也好加深记忆. 如果你想学习闭包那么js中的回收机制是必不可少的,当然学习闭包除了需要理解js中的回收机制以外还需要了解其他的概念,我的其他文章有相关的说明,这里不做闭包的讲解. 为什么要有回收机制?why? 打个比方,我有一个内存卡,这个内存是8G的,我把文件,视频,音乐,都保存到了这个内存卡,随着我的储存的内容越来越多,这个