使用 ElasticSearch + LogStash + Kibana 来可视化网络流量

简介

ELK 套装包括 ElasticSearch、LogStash 和 Kibana。 其中,ElasticSearch 是一个数据搜索引擎(基于 Apache Lucene)+分布式 NoSQL 数据库;LogStash 是一个消息采集转换器,类似 Syslog,可以接收包括日志消息在内的多种数据格式,然后进行格式转换,发送给后端继续处理;Kibana 是一个 Web 前段,带有强大的数据分析、整理和可视化功能。

是不是听起来就觉得十分强大!

一般情况下,ELK 套装的工作流程为 原始数据 -> LogStash -> ElasticSearch -> Kibana

网络流量信息采集协议常见包括 sFlow 和 NetFlow,两种协议都支持采集需要的网络信息,发送到指定的采集器(例如 netflow-analyzersFlowTrend 和 softflowd)。其中,sFlow
一般基于采样,NetFlow 则可以基于所有网包信息获取更为精确的信息。

这里展示如何使用 ELK 套装 + NetFlow 可视化网络流量,基于 sFlow 的操作是类似的,也给出了关键步骤。

安装 ELK 套装

安装 ElasticSearch

步骤参考 http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/setup-repositories.html。可以选择源码编译或安装包安装。

以 CentOS 下使用安装包为例,首先添加证书。

$ rpm --import https://packages.elasticsearch.org/GPG-KEY-elasticsearch

添加源文件 /etc/yum.repos.d/elasticsearch.repo,内容为

[elasticsearch-1.4]
name=Elasticsearch repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/elasticsearch/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1

执行安装命令

$ yum update; yum install -y elasticsearch

启动 elasticsearch 服务,默认监听在 9200 端口,配置文件在 /etc/elasticsearch/elasticsearch.yml

$ /etc/init.d/elasticsearch restart

配置 elasticsearch 服务随系统自动启动

$ chkconfig --add elasticsearch

此时,查询本地 9200 端口,会获取类似下面的反馈信息,表示服务启动成功。

$ curl -XGet localhost:9200
{
  "status" : 200,
  "name" : "Thunderball",
  "cluster_name" : "elasticsearch",
  "version" : {
    "number" : "1.4.4",
    "build_hash" : "c88f77ffc81301dfa9dfd81ca2232f09588bd512",
    "build_timestamp" : "2015-02-19T13:05:36Z",
    "build_snapshot" : false,
    "lucene_version" : "4.10.3"
  },
  "tagline" : "You Know, for Search"
}

安装 LogStash

从 http://www.elasticsearch.org/overview/logstash/download/ 下载源码包或安装包。

以 CentOS 下使用安装包为例:

$ yum install https://download.elasticsearch.org/logstash/logstash/packages/centos/logstash-1.4.2-1_2c0f5a1.noarch.rpm

默认安装到 /opt/logstash/ 目录下,配置文件在 /etc/logstash/ 目录下。

也可以通过添加 yum 源的方式。

$cat /etc/yum.repos.d/logstash.repo
[logstash-1.4]
name=logstash repository for 1.4.x packages
baseurl=http://packages.elasticsearch.org/logstash/1.4/centos
gpgcheck=1
gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch
enabled=1

测试安装是否成功

$ /opt/logstash/bin/logstash -e ‘input { stdin { } } output { stdout {} }‘

此时,随意从控制台输入内容后回车,会作为一条 log,显示出来。

LogStash 支持配置文件,下面通过配置文件配置 LogStash 将收到的 NetFlow 消息进行格式转换后发送到本机的 ElasticSearch。

假定 NetFlow 消息会被采集器发送到本地的 2055 端口,将消息转换后发送给本地的 elastic search 主机,索引名称为 "logstash_netflow5-%{+YYYY.MM.dd}"。

$ cat /etc/logstash/conf.d/netflow.conf
input {
    udp {
        port => 2055
        codec => netflow {
            definitions => "/opt/logstash/lib/logstash/codecs/netflow/netflow.yaml"
            versions => [5]
        }
    }
}

output {
    stdout { codec => rubydebug }
    elasticsearch {
        index => "logstash_netflow5-%{+YYYY.MM.dd}"
        host => localhost
    }
}

之后重启 logstash 服务。

安装 Kibana

下载压缩包,并解压。

$ axel https://download.elasticsearch.org/kibana/kibana/kibana-4.0.0-beta3.tar.gz
$ tar xzvf kibana-4.0.0-beta3.tar.gz
$ cd kibana-4.0.0-beta3

修改 config/kibana.yml,指定 elasticsearch 所在地址,默认认为在本地的 9200 端口。

运行 kibana。

$ ./bin/kibana

启动成功后,会打印如下的信息

The Kibana Backend is starting up... be patient
{"@timestamp":"2015-01-08T13:54:43+08:00","level":"INFO","name":"Kibana","message":"Kibana server started on tcp://0.0.0.0:5601 in production mode."}

通过浏览器访问本地的 5601 端口,因为没有数据,所以这个时候看不到任何数据信息。如果有了采集到的数据后,选择 logstash_netflow5-* 格式的索引,会查询到所有的 NetFlow 数据信息。

配置数据采集

配置 OpenvSwitch

利用下面的命令,创建一个新的网桥 ovs-br,作为要进行抓包的网桥,也可以使用已有网桥。

$ ovs-vsctl add-br ovs-br

并配置一个 IP 地址给网桥内部接口,否则后面启动 Docker 服务会报错(检查避免与 Docker 默认网桥冲突)。

$ ifconfig ovs-br 172.17.42.1/24

下面可以选择使用 NetFlow 采集还是 sFlow 采集,需要对应调整 logstash 中的配置。

打开 NetFlow

COLLECTOR_IP=127.0.0.1
COLLECTOR_PORT=2055
TIMEOUT=10
sudo ovs-vsctl -- set Bridge ovs-br netflow=@nf \
-- --id=@nf create NetFlow targets=\"${COLLECTOR_IP}:${COLLECTOR_PORT}\" active-timeout=${TIMEOUT}

清除 netflow 规则。

$ sudo ovs-vsctl clear Bridge ovsbr netflow

打开 sFlow

$ COLLECTOR_IP=127.0.0.1
$ COLLECTOR_PORT=6343
$ AGENT=eth0
$ HEADER=128
$ SAMPLING=512
$ POLLING=10

其中,AGENT 是从本地的哪个网卡发出流量到采集器。

$ sudo ovs-vsctl -- \
--id=@sflow create sflow agent=${AGENT} target=\"${COLLECTOR_IP}:${COLLECTOR_PORT}\" header=${HEADER} \
sampling=${SAMPLING} polling=${POLLING} \
-- set bridge ovs-br sflow=@sflow

清除 sflow 规则。

$ sudo ovs-vsctl clear Bridge ovsbr sflow

使用 Docker 容器生成流量

这里可以使用 Docker 容器来模拟主机发送流量。

在启动 Docker 服务的时候,使用 -b BRIDGE 或 --bridge=BRIDGE 来指定使用的网桥。或者修改 Docker 配置文件来添加参数并重启服务。

Ubuntu 下配置文件为 /etc/default/docker,重启服务命令为 service docker restart

CentOS 下配置文件为 /etc/sysconfig/docker,重启服务命令为 /etc/init.d/docker restart

搜索带有 iperf 的镜像。

$ docker search iperf
NAME                    DESCRIPTION                                     STARS     OFFICIAL   AUTOMATED
moutten/iperf           Dockerfile to setup an iperf server for ne...   0                    [OK]
xeor/iperf-server                                                       0                    [OK]
gdanii/iperf            Ubuntu 14.04 with iperf                         0
chengping/iperf         docker test with iperf                          0
m2i3/iperf                                                              0
kiratomato/iperf                                                        0
brendanburns/iperf                                                      0
mbennett/iperf_client   for internal testing                            0
mbennett/iperf_server                                                   0
mbennett/iperfbase                                                      0
mbennett/iperfserver                                                    0

这里可以任意选择一个,例如下载 gdanii/iperf 镜像。

$ docker pull gdanii/iperf

启动两个容器,一个当作服务端(地址为 172.17.0.2)

$ iperf -s

一个当作客户端(地址为 172.17.0.3),对服务端发起请求。

$ iperf -c 172.17.0.2

这个时候刷新 kibana 页面,就可以看到收集到的每条 flow 的信息了。一条 NetFlow 的信息可能长成下面的样子。

{
  "_index": "logstash_netflow5-2015.04.28",
  "_type": "logs",
  "_id": "tnUt34s8QCyN-E1mc46wqQ",
  "_score": 1,
  "_source": {
    "@version": "1",
    "@timestamp": "2015-04-28T03:02:27.760Z",
    "netflow": {
      "version": "5",
      "flow_seq_num": "190",
      "engine_type": "9",
      "engine_id": "9",
      "sampling_algorithm": "0",
      "sampling_interval": "0",
      "flow_records": "2",
      "ipv4_src_addr": "10.0.0.2",
      "ipv4_dst_addr": "10.0.0.4",
      "ipv4_next_hop": "0.0.0.0",
      "input_snmp": "2",
      "output_snmp": "3",
      "in_pkts": "10",
      "in_bytes": "980",
      "first_switched": "2015-04-28T02:46:57.760Z",
      "last_switched": "2015-04-28T03:02:27.759Z",
      "l4_src_port": "0",
      "l4_dst_port": "2048",
      "tcp_flags": "0",
      "protocol": "1",
      "src_tos": "0",
      "src_as": "0",
      "dst_as": "0",
      "src_mask": "0",
      "dst_mask": "0"
    },
    "host": "9.186.100.88"
  },
  "fields": {
    "netflow.last_switched": [
      1430190147759
    ],
    "@timestamp": [
      1430190147760
    ],
    "netflow.first_switched": [
      1430189217760
    ]
  }
}

这些信息作为记录被存放到了 elastic search 中。当然, ELK 套装的威力要远大于此,可以通过阅读官方文档进行学习。

当然也使用 kvm 启动两个 vm 挂载到 ovs-br 上进行测试或者利用本地主机作为客户端或服务端之一。这里不再赘述。

参考

  • ElasticSearch: http://elasticsearch.org
  • http://www.areteix.net/blog/2013/08/network-flow-monitoring-with-open-vswitch/
  • http://blogs.cisco.com/security/step-by-step-setup-of-elk-for-netflow-analytics
  • netflow-analyzer: http://www.solarwinds.com/products/freetools/netflow-analyzer.aspx
  • sFlowTrend: http://www.inmon.com/products/sFlowTrend.php
  • softflowd: https://code.google.com/p/softflowd

转载请注明:http://blog.csdn.net/yeasy/article/details/45332493

时间: 2024-07-30 03:44:15

使用 ElasticSearch + LogStash + Kibana 来可视化网络流量的相关文章

(高版本)ELK(Elasticsearch + Logstash + Kibana)服务服务搭建

一.ELK是什么鬼? ELK实际上是三个工具的集合,Elasticsearch + Logstash + Kibana,这三个工具组合形成了一套实用.易用的监控架构,很多公司利用它来搭建可视化的海量日志分析平台. 1. ElasticSearch ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引

Elasticsearch+logstash+kibana实现日志分析(实验)

Elasticsearch+logstash+kibana实现日志分析(实验) 一.前言 Elastic Stack(旧称ELK Stack),是一种能够从任意数据源抽取数据,并实时对数据进行搜索.分析和可视化展现的数据分析框架.(hadoop同一个开发人员) java 开发的开源的全文搜索引擎工具 基于lucence搜索引擎的 采用 restful - api 标准的 高可用.高扩展的分布式框架 实时数据分析的 官方网站: https://www.elastic.co/products 为什么

基于ELK5.1(ElasticSearch, Logstash, Kibana)的一次整合测试

前言开源实时日志分析ELK平台(ElasticSearch, Logstash, Kibana组成),能很方便的帮我们收集日志,进行集中化的管理,并且能很方便的进行日志的统计和检索,下面基于ELK的最新版本5.1进行一次整合测试. ElasticSearch1.概述:ElasticSearch是一个高可扩展的开源的全文搜索分析引擎.它允许你快速的存储.搜索和分析大量数据.ElasticSearch通常作为后端程序,为需要复杂查询的应用提供服务.Elasticsearch是一个基于Lucene的开

Elasticsearch + Logstash + Kibana 搭建教程

# ELK:Elasticsearch + Logstash + Kibana 搭建教程 Shipper:日志收集者.负责监控本地日志文件的变化,及时把日志文件的最新内容收集起来,输出到Redis暂存.Indexer:日志存储者.负责从Redis接收日志,写入到本地文件.Broker:日志Hub,用来连接多个Shipper和多个Indexer.无论是Shipper还是Indexer,Logstash始终只做前面提到的3件事: Shipper从日志文件读取最新的行文本,经过处理(这里我们会改写部分

使用ElasticSearch+LogStash+Kibana+Redis搭建日志管理服务

1.使用ElasticSearch+LogStash+Kibana+Redis搭建日志管理服务 http://www.tuicool.com/articles/BFzye2 2.ElasticSearch+LogStash+Kibana+Redis日志服务的高可用方案 http://www.tuicool.com/articles/EVzEZzn 3.示例 开源实时日志分析ELK平台部署 http://baidu.blog.51cto.com/71938/1676798?utm_source=t

用ElasticSearch,LogStash,Kibana搭建实时日志收集系统

用ElasticSearch,LogStash,Kibana搭建实时日志收集系统 介绍 这套系统,logstash负责收集处理日志文件内容存储到elasticsearch搜索引擎数据库中.kibana负责查询elasticsearch并在web中展示. logstash收集进程收获日志文件内容后,先输出到redis中缓存,还有一logstash处理进程从redis中读出并转存到elasticsearch中,以解决读快写慢速度不一致问题. 官方在线文档:https://www.elastic.co

elasticsearch + logstash + kibana 搭建实时日志收集系统【原创】

实时日志统一收集的好处: 1.快速定位集群中问题机器 2.无需下载整个日志文件(往往比较大,下载耗时多) 3.可以对日志进行统计 a.发现出现次数最多的异常,进行调优处理 b.统计爬虫ip c.统计用户行为,做聚类分析等 基于上面的需求,我采用了 ELK(elasticsearch + logstash + kibana)的方案,安装方法请到他们的官网:https://www.elastic.co/ 上面查询,这里我主要讲讲我遇到的问题. ??????1.LVS 分发UDP请求不成功的问题???

Elasticsearch+Logstash+Kibana配置

Elasticsearch+Logstash+Kibana配置 关于Elasticsearch+Logstash+Kibana的安装有很多文章,这里不复述了,这里仅记录一些比较细节的内容. AWS EC2中安装需要的注意事项 9200,9300,5601端口要记得打开 elasticsearch的地址不要写外部IP,否则会很浪费data,写内部ip elasticsearch { host => "ip-10-160-94-102.ap-northeast-1.compute.intern

CentOS7使用Elasticsearch+ Logstash+kibana快速搭建日志分析平台

CentOS7使用Elasticsearch+ Logstash+kibana快速搭建日志分析平台 介绍: 安装logstash,elasticsearch,kibana三件套,搜索程序一般由索引链及搜索组件组成. 索引链功能的实现需要按照几个独立的步骤依次完成:检索原始内容.根据原始内容来创建对应的文档.对创建的文档进行索引. 搜索组件用于接收用户的查询请求并返回相应结果,一般由用户接口.构建可编程查询语句的方法.查询语句执行引擎及结果展示组件组成. Elasticsearch是个开源分布式搜