收集、分析线上日志数据实战——ELK

本文来自网易云社区

作者:田躲躲

用户行为统计(User Behavior Statistics, UBS)一直是互联网产品中必不可少的环节,也俗称埋点。对于产品经理,运营人员来说,埋点当然是越多,覆盖范围越广越好。通过用户行为分析系统可洞悉用户基本操作习惯、探析用户心理。通过行为数据的补充,构建出精细、完整的用户画像,对不同特征用户做个性化营销,提升用户体验。让产品设计人员准确评估用户行为路径转化、产品改版优良、某一新功能对产品的影响几何,让运营人员做精准营销并且评估营销结果等。

目前所负责项目前期采用了前后端约定字段,埋点统计用户操作行为。数据存放在DDB中。如果用户行为日志非常大的话,这种方式肯定是不可行的。故采用了目前比较成熟的ELK代替之前的统计流程。本篇文章主要介绍ELK集群搭建,基本API封装,以及遇到的一些坑。

Elasticsearch

Elasticsearch是一个基于Lucene构建的开源、分布式、RESTful风格的搜索引擎。它被设计用于云计算中,具有实时搜索负载、稳定、快速、安装使用方便等优点。(之前用过SolrCloud,ES对用户的侵入性简直可以忽略)

集群安装:

每台机器先配置elasticsearch.yml,主要配置信息如下:

#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: es-commenta-event #其他机器集群名称应该保持一致
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: es-node-c1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /opt/elk/elasticsearch-5.1.1/data
#
# Path to log files:
#
path.logs: /opt/elk/elasticsearch-5.1.1/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: 192.168.140.133 #本机器host
#
# Set a custom port for HTTP:
#
#http.port: 9200
#
# For more information, see the documentation at:
# <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when new node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.zen.ping.unicast.hosts: ["192.168.140.133",  "192.168.140.134", "192.168.140.135"] #集群host列表

# Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
#
discovery.zen.minimum_master_nodes: 2

集群启动:

Q1:can not run elasticsearch as root

因为是本地虚拟机root安装的,启动的时候会报这个错。解决方案是:

group esgroup 
useradd esuser -g esgroup -p espassword
chown -R esuser:esgroup /etc/
chown -R esuser:esgroup /opt/

切换到esuser用户即可执行启动命令。

Q2:Unsupported major.minor version 52.0

目前安装的ES版本为5.1.1,需要Jdk1.8的版本,故安装下Jdk1.8,配置下环境变量,即可执行启动命令。

Q3:max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]

ES启动占用更大的内存。修改如下:

sysctl -w vm.max_map_count=262144

每个ES服务设置好后,就可以真正启动了。依次启动机器的时候,可以看下机器日志是否有node加入到集群。如:

curl ‘192.168.140.133:9200‘{    "name": "es-node-c1", 
    "cluster_name": "es-commenta-event", 
    "cluster_uuid": "wi_1VOWoRqecjIht3Ra3mg", 
    "version": {        "number": "5.1.1", 
        "build_hash": "5395e21", 
        "build_date": "2016-12-06T12:36:15.409Z", 
        "build_snapshot": false, 
        "lucene_version": "6.3.0"
    }, 
    "tagline": "You Know, for Search"}

目前有3台虚拟机,默认ES有5个节点,可以通过命令创建3个节点的index,每个主节点有一个复制节点。

curl -XPUT ‘http://192.168.140.133:9200/commenta‘ -d ‘{"settings" : {"number_of_shards" : 3,"number_of_replicas" : 1}}‘

集群状态:

curl ‘http://192.168.140.133:9200/_cluster/health?pretty‘{  "cluster_name" : "es-commenta-event",  "status" : "green",  "timed_out" : false,  "number_of_nodes" : 3,  "number_of_data_nodes" : 3,  "active_primary_shards" : 3,  "active_shards" : 6,  "relocating_shards" : 0,  "initializing_shards" : 0,  "unassigned_shards" : 0,  "delayed_unassigned_shards" : 0,  "number_of_pending_tasks" : 0,  "number_of_in_flight_fetch" : 0,  "task_max_waiting_in_queue_millis" : 0,  "active_shards_percent_as_number" : 50.0}

安装插件:

通过类SQL转化成DSL
bin/elasticsearch-plugin install install https://github.com/NLPchina/elasticsearch-sql/releases/download/5.1.1.0/elasticsearch-sql-5.1.1.0.zip
X-Pack集成了权限、监控等功能,是一款非常有用的插件。但是商用的,收费。
bin/elasticsearch-plugin install x-pack

Logstash

Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置。

安装:

到官网下载logstash5.1.1版本即可。

启动:

1、无配置文件启动

bin/logstash -e ‘input{ stdin{} } output{ stdout{} }‘Sending Logstash‘s logs to /home/webedit/logstash/logstash-5.1.1/logs which is now configured via log4j2.properties
The stdin plugin is now waiting for input:
[2017-04-27T15:47:38,023][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-04-27T15:47:38,039][INFO ][logstash.pipeline        ] Pipeline main started
[2017-04-27T15:47:38,115][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
hello elastic
2017-04-27T07:49:00.966Z localhost.localdomain hello elastic

logstash会采集命令行输入的命令

2、配置文件启动

假设我们需要采集的日志记录是这种格式的:

INFO  [17.04.27 16:12:12][com.netease.mail.vip.commenta.filter.EventLogFilter]: |44171|1|1|1|1493280732227|0.0|123.58.160.131|133001|COMMENTA-B54C43F5-4FCB-4D10-B9EC-67862FBF0055|1493280732440|huiping_mp|0.7.0|null|1|

如何采集这种格式的日志呢?这里采用正则表达式去匹配,具体配置文件如下:

input {

file {
    type => "commenta"
    path => ["/home/logs/commenta/stdout.log"]
    start_position => "beginning"
    codec => plain { charset => "Windows-1252" }
}

}

filter {if [type] == "commenta" {
    grok {
      match => { "message" => "%{DATA:className}\|%{BASE16FLOAT:id}\|%{DATA:eventType:int}\|%{DATA:page:int}\|%{DATA:eventFrom:int}\|%{DATA:eventTime}\|%{BASE16FLOAT:eventWeight}\|%{DATA:ip}\|%{BASE16FLOAT:userId}\|%{DATA:uniqueCode}\|%{DATA:createTime}\|%{DATA:clientFrom}\|%{DATA:appVersion}\|%{DATA:data}\|%{DATA:eventStep:int}\|"}
          remove_field => ["message"]
        }
}if ‘_grokparsefailure‘ in [tags] { #过滤掉不匹配的事件
	drop{}
}

	mutate  { #数据类型转换  
                        convert => [ "eventWeight", "float"]
                        convert => [ "id", "float"]
                        convert => [ "userId", "float"]
                }

}
output{
       
       stdout { codec => rubydebug } #打印出行为日志记录在控制台

       elasticsearch{
             hosts => ["192.168.140.133:9200","192.168.140.134:9200","192.168.140.135:9200"]
             index => "commenta"
        }
}

下面我们可以启动logstash看下效果:

./bin/logstash -f ./config/logstash.conf{     "appVersion" => "0.7.0",           "data" => "null",             "ip" => "XXXXXXXXX",      "className" => "INFO  [17.04.27 16:12:12][com.netease.mail.vip.commenta.filter.EventLogFilter]: ",      "eventType" => 1,           "type" => "commenta",    "eventWeight" => 0.0,         "userId" => 133001.0,           "tags" => [],           "path" => "/home/logs/commenta/stdout.log",     "@timestamp" => 2017-04-27T08:18:58.245Z,     "uniqueCode" => "COMMENTA-B54C43F5-4FCB-4D10-B9EC-67862FBF0055",     "createTime" => "1493280732440",       "@version" => "1",           "host" => "testfb-m126-161",      "eventTime" => "1493280732227",      "eventStep" => 1,     "clientFrom" => "huiping_mp",             "id" => 44171.0,           "page" => 1,      "eventFrom" => 1}

通过打印在控制台的日志可以看到我们已经通过logstash收集到了行为日志记录(部分数据已脱敏)。当然我们也可以通过Kibana看到这些数据,下部分将会讲到。

3、启动问题

Q1:Unsupported major.minor version 52.0

使用的是Logstash版本为5.1.1,需要Jdk1.8的环境,故安装下Jdk1.8,配置下环境变量,即可执行启动命令。

Q2:unknown setting host for elasticsearch

配置Logstash的启动文件时,注意版本的问题,如host-->hosts

Kibana

Kibana是一个开源的分析与可视化平台,设计出来用于和Elasticsearch一起使用的。你可以用kibana搜索、查看、交互存放在Elasticsearch索引里的数据,使用各种不同的图表、表格、地图等kibana能够很轻易地展示高级数据分析与可视化。

安装:

到官网下载Kibana5.1.1版本即可。

启动:

主要配置如下:

# Kibana is served by a back end server. This setting specifies the port to use.
#server.port: 5601

# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is ‘localhost‘, which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "192.168.140.133"

# Enables you to specify a path to mount Kibana at if you are running behind a proxy. This only affects
# the URLs generated by Kibana, your proxy is expected to remove the basePath value before forwarding requests
# to Kibana. This setting cannot end in a slash.
#server.basePath: ""

# The maximum payload size in bytes for incoming server requests.
#server.maxPayloadBytes: 1048576

# The Kibana server‘s name.  This is used for display purposes.
#server.name: "your-hostname"

# The URL of the Elasticsearch instance to use for all your queries.
elasticsearch.url: "http://192.168.140.133:9200"
.......

启动成功后,我们可以监控commenta*的索引(安装ES的时候,创建了)

bin/kibana

这时候就可以看到Logstash收集到的数据日志了

当然我们也可以配置一些统计:

为了更直观的展示,我们可以把统计“拖拽”到Dashboard中。

至此,ELK已经搭建完成,并提供一些简单的功能。 但是有一些统计Kibana是做不了的。这时候我们程序需要处理一下。

Java API

HandleEsClientServer

/* ES服务器列表 */
    private String serverList;    /* 设置client.transport.sniff为true来使客户端去嗅探整个集群的状态,把集群中其它机器的ip地址加到客户端中,它会自动帮你添加,并且自动发现新加入集群的机器 */
    private Boolean sniff = false;    /* 集群名称 */
    private String clusterName;    /* 连接客户端 */
    private Client client;    /* 搜索基本工具类 */
    private SearchDao searchDao;    public HandleEsClientServer() {
    }    public HandleEsClientServer(String serverList, Boolean sniff, String clusterName) {        this.serverList = serverList;        this.sniff = sniff;        this.clusterName = clusterName;
    }    @Override
    public void afterPropertiesSet() throws Exception {

        logger.info("es server start at time={}, serverList={}, clusterName={}, sniff={}", DateUtil.toStr(new Date(),DateUtil.YYYY_MM_DD_HH_MM_SS),
                serverList, clusterName, sniff);        if (this.getServerList() == null || this.getServerList().length() == 0) {
            logger.error("es serverList is null...");            return;
        }

        List clusterList = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(this.getServerList());

        List transportAddresses = new ArrayList<>();        for (String cluster : clusterList) {
            List host = Splitter.on(":").trimResults().omitEmptyStrings().splitToList(cluster);
            String ip = host.get(0);
            Integer port = Integer.valueOf(host.get(1));            try {
                transportAddresses.add(new InetSocketTransportAddress(InetAddress.getByAddress(getIpByte(ip)), port == null ? 9300 : port));
            } catch (UnknownHostException e) {
                logger.error("init es client error={} at time={} ", e, DateUtil.toStr(new Date(),DateUtil.YYYY_MM_DD_HH_MM_SS));                return;
            }
        }        //配置启动参数
        Settings settings = Settings.builder()
                .put("cluster.name", clusterName)
                .put("client.transport.sniff", sniff)
                .build();        //初始化Client
        this.client = new PreBuiltTransportClient(settings)
                .addTransportAddresses(transportAddresses.toArray(new TransportAddress[transportAddresses.size()]));        this.searchDao = new SearchDao(this.client);

        logger.info("es server start success at time={}", DateUtil.toStr(new Date(),DateUtil.YYYY_MM_DD_HH_MM_SS));

    }

HandleEsData

 /**
     * 根据elasticsearch-sql插件的sql语句查询结果。
     * @param query
     * @return
     * @throws SqlParseException
     * @throws SQLFeatureNotSupportedException
     */
    public SqlResponse selectBySQL(String query) throws SqlParseException, SQLFeatureNotSupportedException {

        logger.info("selectBySQL, query={}",query);        try{
            SqlElasticSearchRequestBuilder select = (SqlElasticSearchRequestBuilder) searchDao.explain(query).explain();            return new SqlResponse((SearchResponse)select.get());
        }catch (Exception e){
            logger.error(e.getMessage(),e);
        }        return null;

    }/**
     * 批量插入数据,使用Obj的id字段。
     * @param _index
     * @param _type
     * @param data
     * @param generate_id
     * @param 
     * @return
     */
    public  BulkResponse batchObjIndex(String _index, String _type, List data, boolean generate_id){

        logger.info("batchObjIndex, index={}, type={}, data={}, generate_id={}", _index, _type, data, generate_id);

        Assert.notEmpty(data, "data is not allowed empty");

        BulkRequestBuilder bulkRequest = client.prepareBulk();        for (T tObj : data) {
            Class clazz = tObj.getClass();
            String json = JSONObject.toJSONString(tObj, SerializerFeature.WriteMapNullValue);            if(generate_id){
                bulkRequest.add(client.prepareIndex(_index.toLowerCase(), _type.toLowerCase()).setSource(json));
            } else {                try {
                    Object value = clazz.getDeclaredMethod("getId").invoke(tObj);
                    String _id = String.valueOf(value);
                    bulkRequest.add(client.prepareIndex(_index.toLowerCase(), _type.toLowerCase(), _id).setSource(json));
                } catch (Exception e) {
                    logger.error(e.getMessage(),e);
                }
            }
        }        return bulkRequest.execute().actionGet();
    }

参考资料:

http://www.learnes.net/

http://udn.yyuap.com/doc/logstash-best-practice-cn/

https://www.gitbook.com/book/chenryn/elk-stack-guide-cn/details

https://www.elastic.co/guide/en/elasticsearch/reference/5.1/getting-started.html

https://www.elastic.co/guide/en/logstash/5.1/getting-started-with-logstash.html

https://www.elastic.co/guide/en/kibana/5.1/getting-started.html

http://elasticsearch.cn/

网易云免费体验馆,0成本体验20+款云产品!

更多网易研发、产品、运营经验分享请访问网易云社区

相关文章:
【推荐】 HTTP/2部署使用
【推荐】 类似gitlab代码提交的热力图怎么做?

原文地址:https://www.cnblogs.com/zyfd/p/9790253.html

时间: 2024-10-10 09:52:57

收集、分析线上日志数据实战——ELK的相关文章

Elastic Stack实战学习教程~日志数据的收集、分析与可视化

Elastic Stack介绍 近几年,互联网生成数据的速度不断递增,为了便于用户能够更快更精准的找到想要的内容,站内搜索或应用内搜索成了不可缺少了的功能之一.同时,企业积累的数据也再不断递增,对海量数据分析处理.可视化的需求也越来越高. 在这个领域里,开源项目ElasticSearch赢得了市场的关注,比如,去年Elastic公司与阿里云达成合作伙伴关系提供阿里云 Elasticsearch 的云服务.今年10月Elastic公司上市,今年11月举行了Elastic 中国开发者大会.目前各大云

大数据时代日志分析平台ELK的搭建

A,首先说说ELK是啥,  ELK是ElasticSearch . Logstash 和 Kiabana 三个开源工具组成.Logstash是数据源,ElasticSearch是分析数据的,Kiabana是展示数据用的 B,开始搞 1,安装 Logstash 依赖包 JDK wget http://download.oracle.com/otn-pub/java/jdk/8u45-b14/jdk-8u45-linux-x64.tar.gz 要是没有wget可以yum -y install wge

万能日志数据收集器 Fluentd - 每天5分钟玩转 Docker 容器技术(91)

前面的 ELK 中我们是用 Filebeat 收集 Docker 容器的日志,利用的是 Docker 默认的 logging driver json-file,本节我们将使用 fluentd 来收集容器的日志. Fluentd 是一个开源的数据收集器,它目前有超过 500 种的 plugin,可以连接各种数据源和数据输出组件.在接下来的实践中,Fluentd 会负责收集容器日志,然后发送给 Elasticsearch.日志处理流程如下: 这里我们用 Filebeat 将 Fluentd 收集到的

6.3.1版本elk+redis+filebeat收集docker+swarm日志分析

最近公司比较忙,没来的及更新博客,今天为大家更新一篇文章,elk+redis+filebeat,这里呢主要使用与中小型公司的日志收集,如果大型公司 可以参考上面的kafka+zookeeper配合elk收集,好了开始往上怼了: Elk为了防止数据量突然键暴增,吧服务器搞奔溃,这里需要添加一个redis,让数据输入到redis当中,然后在输入到es当中 Redis安装: #!/bin/bash # 6379  Redis-Server tar zxf redis-3.0.0-rc5.tar.gz

Zabbix与ELK整合实现对日志数据的实时监控

4.2.zabbix平台配置日志告警 一. ELK与zabbix有什么关系? ELK大家应该比较熟悉了,zabbix应该也不陌生,那么将ELK和zabbix放到一起的话,可能大家就有疑问了?这两个放到一起是什么目的呢,听我细细道来. ELK是一套日志收集套件,它其实有由Elasticsearch.Logstash和Kibana三个软件组成,通过ELK可以收集系统日志.网站日志.应用系统日志等各种日志数据,并且还可以对日志进行过滤.清洗,然后进行集中存放并可用于实时检索.分析.这是ELK的基础功能

Kubernetes运维之使用ELK Stack收集K8S平台日志

kubernetes运维之使用elk Stack收集k8s平台日志目录: 收集哪些日志 elk Stack日志方案 容器中的日志怎么收集 k8S平台中应用日志收集 一.收集哪些日志 ? k8s系统的组件日志 比如kubectl get cs下面的组件 master节点上的controller-manager,scheduler,apiservernode节点上的kubelet,kube-proxy? k8s Cluster里面部署的应用程序日志 标准输出 日志文件elk Stack日志方案,改怎

android app崩溃日志收集以及上传

源码获取请到github:https://github.com/DrJia/AndroidLogCollector 已经做成sdk的形式,源码已公开,源码看不懂的请自行google. 如果想定制适应自己app的sdk请自行fork. AndroidLogCollector android app崩溃日志收集sdk 1.0 作者:贾博士 崩溃日志收集方法: 1.LogCollector是lib包,在需要添加崩溃日志sdk的工程中导入此包. 2.导入lib后,在自己的工程的AndroidManife

从Apache的日志文件收集和提供统计数据(一个Python插件架构的简单实现)

从Apache的日志文件收集和提供统计数据 这一章我们将介绍基于插件程序的架构和实现.作为例子,我们将构建一个分析Apache服务器log文件的框架.这一次我们不再使用单片机的方式来创建,而是改为采用模块化的方式.一旦我们有了一个基本框架,我们就可以为它创建一个插件.这个插件可以基于请求者的地理位置执行分析. 程序的结构和功能 在数据维护和统计收集领域,很难有一个单一的应用程序可以适合多个用户的需求.让我们以分析Apache的web服务器日志文件为例.web服务器接受到的每一个请求都被记录在日志

Spark2.0从入门到精通:Scala编程、大数据开发、上百个实战案例、内核源码深度剖析视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv