ELK5.2+kafka+zookeeper+filebeat集群部署

架构图

  • 考虑到日志系统的可扩展性以及目前的资源(部分功能复用),整个ELK架构如下:

架构解读 : (整个架构从左到右,总共分为5层)

第一层、数据采集层

最左边的是业务服务器集群,上面安装了filebeat做日志采集,同时把采集的日志分别发送给两个logstash服务(2.187、2.189)

第二层、数据处理层,数据缓存层

logstash服务把接受到的日志经过格式处理,转存到本地的kafka broker+zookeeper 集群中。

第三层、数据转发层

这个单独的Logstash(2.184)节点会实时去kafka broker集群拉数据,转发至ES DataNode。

第四层、数据持久化存储

ES DataNode 会把收到的数据,写磁盘,建索引库。

第五层、数据检索,数据展示

ES Master + Kibana 主要 协调 ES集群,处理数据检索请求,数据展示。

服务器资源以及软件版本 
- 操作系统:centos7.2、虚拟机

  • 服务器角色(研究环境)
192.168.2.184 elastic、kafka、 logstash-out-from-kafka zookeeper  
192.168.2.187 elastic、kafka、logstash-in-to-kafka、zookeeper  
192.168.2.189 elastic、kafka、logstash-in-to-kafka、zookeeper、kibana  

主机ip

部署服务

服务器配置
  • 群星日志系统服务器角色(测试环境)

主机ip

部署服务

服务器配置

192.168.2.130 ExceptionLess  
192.168.2.131 kafka、 logstash-out-from-kafka zookeeper opskafka.manjinba.cn
192.168.2.132 kafka、logstash-in-to-kafka、zookeeper opskafka.manjinba.cn
192.168.2.133 kafka、logstash-in-to-kafka、zookeeper opskafka.manjinba.cn
192.168.2.135 elastic opselastic.manjinba.cn
192.168.2.136 elastic opselastic.manjinba.cn
 192.168.2.138 kibana kibana.manjinba.cn

软件版本: 
jdk-8u151-linux-x64 
elasticsearch-5.2.2             wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.2.2.tar.gz

kafka_2.10-0.10.2.0            wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
kafka-manager                    wget https://github.com/yahoo/kafka-manager/archive/master.zip
kibana-5.2.2-linux-x86_64  wget https://artifacts.elastic.co/downloads/kibana/kibana-5.2.2-linux-x86_64.tar.gz
logstash-5.2.2                  wget https://artifacts.elastic.co/downloads/logstash/logstash-5.2.2.tar.gz
zookeeper-3.4.9               wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz

filebeat-5.2.2                    wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-5.2.2-linux-x86_64.tar.gz

安装部署

  • 系统优化

    cat /etc/sysctl.conf

    net.ipv4.tcp_max_syn_backlog = 4096

    net.core.netdev_max_backlog = 2048

    net.ipv4.tcp_fin_timeout = 15

    net.ipv4.tcp_tw_reuse = 1

    net.ipv4.tcp_tw_recycle = 1

    net.ipv4.tcp_syncookies = 1

    vm.max_map_count= 262144 #后期配置ES很关键

    vm.swappiness = 1

    cat /etc/security/limits.conf

    *                               soft    nofile  65536

    *                               hard    nofile  65536

  • 配置java环境

    cd /apps/svr

    tar zxvf jdk-8u151-linux-x64.tar.gz

    ln -s jdk1.8.0_151 jdk

     

    cat >> /etc/profile <<EOF

    export JAVA_HOME=/apps/svr/jdk

    export PATH=$JAVA_HOME/bin:$PATH

    export CLASSPATH=.:\$JAVA_HOME/lib/dt.jar:\$JAVA_HOME/lib/tools.jar

    EOF

     

    source /etc/profile

  • 用户问题 
    为了方便这里所有的应用全部都在apps帐号下 
    useradd apps && echo "Qwer1234" | passwd --stdin apps
  • python升级以及安装supervisor

    cat update_python.sh

    #!/bin/bash

    #creat by xiaojs

    if [ whoami != ‘root‘ ]

    then

    exit 1

    fi

    if [[ python -c "import platform ;print platform.python_version()" = 2.7.* ]]

    then

    echo ‘you need not do everything‘

    exit 0

    else

    echo ‘============================‘

    echo ‘=======start update========‘

    fi

    # get the tar

    cd /usr/local/src

    wget http://ops.bubugao-inc.com/python/Python-2.7.8.tgz

    wget http://ops.bubugao-inc.com/python/pyinotify.tar.gz

    wget http://ops.bubugao-inc.com/python/MySQL-python-1.2.4.zip

    ##

    yum -y install git gcc mysql mysql-devel

    #install

    tar zxvf Python-2.7.8.tgz

    cd Python-2.7.8

    ./configure --prefix=/usr/local/python2.7.8

    make && make install

    mv /usr/bin/python /usr/bin/python_old

    ln -s /usr/local/python2.7.8/bin/python /usr/bin/

    sed -i ‘s/python/python_old/1‘ /usr/bin/yum

    #intall the plugin

    cd ..

    tar zxvf pyinotify.tar.gz

    cd pyinotify

    python setup.py install

    cd ..

    unzip MySQL-python-1.2.4.zip

    cd MySQL-python-1.2.4

    python setup.py install

    ####install supervisor

    cd /usr/local/src

    wget --no-check-certificate https://bootstrap.pypa.io/ez_setup.py -O - | sudo python

    wget http://pypi.python.org/packages/source/d/distribute/distribute-0.6.10.tar.gz

    tar xf distribute-0.6.10.tar.gz

    cd distribute-0.6.10

    python setup.py install

    easy_install supervisor

    cd /usr/local/python2.7.8/bin/

    cp supervisord supervisorctl echo_supervisord_conf /usr/bin/

    mkdir /etc/supervisor && cd /etc/supervisor

    wget http://ops.bubugao-inc.com/python/supervisord.conf

  • 安装elasticsearch 
    cd /apps/svr/ 
    tar zxvf elasticsearch-5.2.2.tar.gz 
    ln -s elasticsearch-5.2.2 elasticsearch 
    [[email protected] elasticsearch]# sed -n /^[^#]/p config/elasticsearch.yml

    cluster.name: SuperApp

    node.name: manjinba01

    network.host: 0.0.0.0

    http.port: 9200

    discovery.zen.ping.unicast.hosts: ["192.168.2.184:9300","192.168.2.187:9300","192.168.2.189:9300"]

    discovery.zen.minimum_master_nodes: 1

    bootstrap.system_call_filter: false

    bootstrap.memory_lock: false

    http.cors.enabled: true

    http.cors.allow-origin: "*"

启动elasticsearch

chown -R apps.apps /apps

su - apps

cd /apps/svr/elasticsearch

bin/elasticsearch -d

  • 另外两台类似,后续会安装x-pack,所以以前的head和bigdesk不用安装
  • zookeeper+kafka集群部署

#zookeeper 
cd /apps/svr 
tar zxvf zookeeper-3.4.9.tar.gz 
ln -s zookeeper-3.4.9 zookeeper 
mkdir -p /apps/dbdat/zookeeper 
[[email protected] zookeeper]# sed -n ‘/^[^#]/p’ conf/zoo.cfg 
tickTime=2000 
initLimit=10 
syncLimit=5 
dataDir=/apps/dbdat/zookeeper 
clientPort=2181 
server.1=192.168.2.184:12888:13888 
server.2=192.168.2.187:12888:13888 
server.3=192.168.2.189:12888:13888

#三台服务器分别赋值 
echo 1 > /apps/dbdat/zookeeper/myid 
echo 2 > /apps/dbdat/zookeeper/myid 
echo 3 > /apps/dbdat/zookeeper/myid

#启动并查看状态 
/apps/svr/zookeeper/bin/zkServer.sh start 
/apps/svr/zookeeper/bin/zkServer.sh status 
[[email protected] zookeeper]# /apps/svr/zookeeper/bin/zkServer.sh status 
ZooKeeper JMX enabled by default 
Using config: /apps/svr/zookeeper/bin/../conf/zoo.cfg 
Mode: follower 
#以上信息就是没问题

#kafka集群

cd /apps/svr

 tar zxvf kafka_2.10-0.10.2.0.tgz

ln -s kafka_2.10-0.10.2.0 kafka

[[email protected] src]# sed -n ‘/^[^#]/p‘ /apps/svr/kafka/config/server.properties

broker.id=1

delete.topic.enable=true

listeners=PLAINTEXT://192.168.2.184:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/apps/logs/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=192.168.2.184:2181,192.168.2.187:2181,192.168.2.189:2181

zookeeper.connection.timeout.ms=6000

#不同的节点,注意broker.id和linsten的ip

\#启动查看是否正常

nohup /apps/svr/kafka/bin/kafka-server-start.sh /apps/svr/kafka/config/server.properties &

\#有一些用得到的指令

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test  # 创建topic

bin/kafka-topics.sh --list --zookeeper localhost:2181   # 查看已经创建的topic列表

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test  # 查看topic的详细信息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 发送消息, 回车后模拟输入一下消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test # 消费消息, 可以换到其他kafka节点, 同步接收生产节点发送的消息

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 6  # 给topic增加分区

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test1  # 删除已经创建的topic, 前提是开了delete.topic.enable=true参数

如果还不能删除, 可以到zookeeper中去干掉它

cd /usr/local/zookeeper-3.4.10/

bin/zkCli.sh

ls /brokers/topics            # 查看topic

rm -rf /brokers/topics/test1     # 删除topic

logstash的部署和配置

cd /apps/svr 
tar zxvf logstash-5.2.2.tar.gz 
ln -s logstash-5.2.2/ logstash 
#安装都一样,重点是两端配置文件不一样,一个是负责写入kafka,一个是负责从kafka提取出来写入elasticsearch,配置分别如下: 
[[email protected] ~]# cat /apps/conf/logstash/logstash-in-kafka.conf

  input {

            beats {

            port => 5044

            }

    }

    output {

    if [type] == "nginx-accesslog" {

    kafka {

            bootstrap_servers => "192.168.2.184:9092,192.168.2.187:9092,192.168.2.189:9092"

            topic_id => "nginx-accesslog"

        }

    }

        if [type] == "tomcat-log" {

        kafka {

                bootstrap_servers => "192.168.2.184:9092,192.168.2.187:9092,192.168.2.189:9092"

                topic_id => "tomcat-log"

                }

        }

        if [type] == "sys-messages" {

        kafka {

                bootstrap_servers => "192.168.2.184:9092,192.168.2.187:9092,192.168.2.189:9092"

                topic_id => "sys-messages"

                }

        }

}

[[email protected] ~]$ cat /apps/conf/logstash/logstash-kafka.conf

input {

kafka{

bootstrap_servers => "192.168.2.184:9092,192.168.2.187:9092,192.168.2.189:9092"

topics => "nginx-accesslog"

    consumer_threads => 50

    decorate_events => true

type => "nginx-accesslog"

}

    kafka{

    bootstrap_servers => "192.168.2.184:9092,192.168.2.187:9092,192.168.2.189:9092"

    topics => "sys-messages"

    consumer_threads => 50

    decorate_events => true

    type => "sys-messages"

    }

    kafka{

    bootstrap_servers => "192.168.2.184:9092,192.168.2.187:9092,192.168.2.189:9092"

    topics => "tomcat-log"

    consumer_threads => 50

    decorate_events => true

    type => "tomcat-log"

    }

}

filter {

    if [type] == "nginx-accesslog" {

            grok {

                    match => ["message","%{IPORHOST:client_ip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:http_version})?|%{DATA:rawrequest})\" (?:%{URIHOST:domain}|-) %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:agent} %{QS:x_forword} %{QS:upstream_host} %{QS:upstream_response} (%{WORD:upstream_cache_status}|-) %{QS:upstream_content_type} %{QS:upstream_response_time} > (%{BASE16FLOAT:request_time}) \"(%{NGINXUID:uid}|-)\""]

            }

            date {

                    locale => "en_US"

                    match => ["timestamp""dd/MMM/yyyy:HH:mm:ss Z"]

                    remove_field => [ "timestamp" ]

            }

}

if [type] == "tomcat-log" {

           grok {

        match => {"message" =>  "((app=(?<app>[^,]*)\,?))(\s*)((app0=(?<app0>[^,]*)\,?)?)(\s*)((app1=(?<app1>[^,]*)\,?)?)(.*\, host)(=(?<host>[^,]*)\,)(\s*)(pid=(?<pid>[^,]*)\,)(\s*)((t0=(?<t0>[^,]*)\,)?)(\s*)(trackId=(?<trackId>[a-zA-Z0-9]+)\})(\s*)(\[(?<time>[^]]*)\])(\s*)(\[(?<loglevel>DEBUG|INFO|WARN|ERROR)\])((.*\"time\":(?<apitime>\d+)\,\"code\":(?<apicode>\"[^\"]*\")\,\"msg\":(?<apimsg>\"[^\"]*)\"\})?)(.*\[Cost)?((\s+(?<Cost>\d+)ms\])?)"}

    }

}

mutate {

    #convert => {"Cost" => "integer"}

    convert => ["Cost","integer","request_time","integer","response","integer","upstream_response","integer"]

}

}

output {

    elasticsearch {

    hosts => ["192.168.2.184:9200","192.168.2.187:9200","192.168.2.189:9200"]

    user => elastic

    password => changeme

    index => "logstash-%{type}-%{+YYYY.MM.dd}"

    manage_template => true

    flush_size => 50000

    idle_flush_time => 10

   }

}

启动logstash

192.168.2.184

nohup /apps/svr/logstash/bin/logstash -f /apps/conf/logstash/logstash-kafka.conf &

192.168.2.187/192.168.2.189

nohup /apps/svr/logstash/bin/logstash -f /apps/conf/logstash/logstash-in-kafka.conf &

应用服务器的filebeat的配置

 

cd /apps/svr

tar zxvf filebeat-5.2.2-linux-x86_64.tar.gz

ln -s filebeat-5.2.2-linux-x86_64 filebeat

[[email protected] svr]# sed -n ‘/^[^#]/’p filebeat/filebeat.yml

filebeat.prospectors:

 

- input_type: log

  paths:

    - /var/log/messages

  document_type: sys-messages

output.logstash:

# The Logstash hosts

 hosts: ["192.168.2.187:5044","192.168.2.189:5044"]

#调试指令:./filebeat -e -c filebeat.yml -d “production”

启动: nohup ./filebeat -c filebeat.yml -e &

kibana页面配置

cd /apps/svr

tar zxvf kibana-5.2.2-linux-x86_64.tar.gz

ln -s kibana-5.2.2-linux-x86_64 kibana

[[email protected] kibana]# sed -n ‘/^[^#]/’p config/kibana.yml

server.port: 5601

server.host: "192.168.2.189"

elasticsearch.url: "http://192.168.2.189:9200"

kibana.index: ".kibana"

启动kibana

nohup bin/kibana &

#对应的nginx的配置如下

upstream kibana {

        keepalive      400;

           server  192.168.2.184:5601 max_fails=3  fail_timeout=30s;

}

server  {

    listen          80;

    server_name     192.168.2.184;

    if (-d $request_filename) {

        rewrite ^/(.*)([^/])$ http://$host/$1$2/ permanent;

    }

    location / {

        proxy_pass              http://kibana;

        proxy_http_version 1.1;

        proxy_set_header Connection "";

        proxy_set_header        X-Real-IP  $remote_addr;

        proxy_set_header        Host             $host;

        proxy_set_header        X-Forwarded-For  $proxy_add_x_forwarded_for;

    }

    error_log           logs/kinaba5.error.log;

    access_log          logs/kinaba5.access.log log_access;

}

#至此,整个框架已经完成,可以先建立kafka的topic测试,然后观察elasticsearch的索引是否建立成功,或简单的从页面观察即可

  • 插件和其他相关

    1、由于上述大部分应用都是跑在后台,有时候进程是否挂掉,不得而知,监控如果对于每个进程监控略显麻烦,而且不方便启动,所以这里用supervisor进行统一管理,上述已经有安装记录,具体的配置就不做展示了 
    2、 x-pack的安装 
    /apps/svr/kibana/bin/kibana-plugin install x-pack

原文地址:https://www.cnblogs.com/larry-luo/p/11133395.html

时间: 2024-08-04 08:11:51

ELK5.2+kafka+zookeeper+filebeat集群部署的相关文章

Kafka 入门之集群部署遇到问题

最近,因为上级主管部门需要通过使用Kafka向其传输文件,又因为此前没有接触过kafka,所以在部署测试kafka程序期间遇到很多问题,在这里总结4个问题与1个建议,方便入门者参考也便于遇到类似问题进行查阅完善. 1.Kafka java代码与Kafka 软件的关系 Kafka java代码与Kafka 软件之间究竟有什么关系呢?Kafka java代码中已经使了kafka-clients-0.8.2.1.jar,kafka_2.11-0.8.2.1.jar,那么还需要安装kafka_2.11-

ZooKeeper分布式集群部署及问题

ZooKeeper为分布式应用系统提供了高性能服务,在许多常见的集群服务中被广泛使用,最常见的当属HBase集群了,其他的还有Solr集群.Hadoop-2中的HA自己主动故障转移等. 本文主要介绍了为HBase集群部署ZooKeeper集群的过程.并说明了部署过程中遇到的问题. 默认情况下,由HBase管理ZooKeeper的启动和停止.要想改动这一默认行为,须要将hbase-env.sh中的export HBASE_MANAGES_ZK=true改为export HBASE_MANAGES_

zookeeper的集群部署

1.上传安装包到集群服务器 2.解压 3.修改配置文件 进入zookeeper的安装目录的conf目录 cp zoo_sample.cfg zoo.cfg vi zoo.cfg # The number of milliseconds of each tick tickTime=2000 initLimit=10 syncLimit=5 dataDir=/root/zkdata clientPort=2181 #autopurge.purgeInterval=1 server.1=hdp20-0

Zookeeper+Kafka集群部署

Zookeeper+Kafka集群部署 主机规划: 10.200.3.85  Kafka+ZooKeeper 10.200.3.86  Kafka+ZooKeeper 10.200.3.87  Kafka+ZooKeeper 软件下载地址: #wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz #wget http://mirror.bit.edu.cn/apache/

Kafka集群部署

一. 关于kafka Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素. 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决. 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案.Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费. 关于Kafka的

kafka基础集群部署

kafka集群部署方案 ZooKeeper第一步主机名称到IP地址映射配置ZooKeeper集群中具有两个关键的角色Leader和Follower.集群中所有的结点作为一个整体对分布式应用提供服务集群中每个结点之间都互相连接所以在配置的ZooKeeper集群的时候每一个结点的host到IP地址的映射都要配置上集群中其它结点的映射信息.例如我的ZooKeeper集群中每个结点的配置以zk-01为例/etc/hosts内容如下所示:192.168.0.11   zk-01192.168.0.12  

4 kafka集群部署及生产者java客户端编程 + kafka消费者java客户端编程

本博文的主要内容有   kafka的单机模式部署 kafka的分布式模式部署 生产者java客户端编程 消费者java客户端编程 运行kafka ,需要依赖 zookeeper,你可以使用已有的 zookeeper 集群或者利用 kafka自带的zookeeper. 单机模式,用的是kafka自带的zookeeper, 分布式模式,用的是外部安装的zookeeper,即公共的zookeeper. Step 6: Setting up a multi-broker cluster So far w

大数据技术之_10_Kafka学习_Kafka概述+Kafka集群部署+Kafka工作流程分析+Kafka API实战+Kafka Producer拦截器+Kafka Streams

第1章 Kafka概述1.1 消息队列1.2 为什么需要消息队列1.3 什么是Kafka1.4 Kafka架构第2章 Kafka集群部署2.1 环境准备2.1.1 集群规划2.1.2 jar包下载2.2 Kafka集群部署2.3 Kafka命令行操作第3章 Kafka工作流程分析3.1 Kafka 生产过程分析3.1.1 写入方式3.1.2 分区(Partition)3.1.3 副本(Replication)3.1.4 写入流程3.2 Broker 保存消息3.2.1 存储方式3.2.2 存储策

3、Kafka集群部署

Kafka集群部署 1)解压安装包 [ip101]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/app/ 2)修改解压后的文件名称 [ip101]$ mv kafka_2.11-0.11.0.0/ kafka 3)在/opt/app/kafka目录下创建logs文件夹 [ip101]$ mkdir logs 4)修改配置文件 [ip101]$ cd config/ [[email protected] config]$ vi server.propert