Kafka+ELK完成日志采集处理

此文档为了做一次记录,按回忆粗略补写。

环境信息

Centos      V7.6.1810

JDK     V1.8.0_171

Rsyslog    V8.24.0-34.el7

Kafka     V2.12-0.10.2.1

zookeeper  V3.4.10

ELK    V6.2.3

服务器分配

配置尽量高点,此次部署kafka+zookeeper和ES皆为集群模式。

服务器名 IP地址 配置 备注
node1 192.168.101.55 CPU:2C 内存:4G 磁盘:100G  
node2 192.168.101.56 CPU:2C 内存:4G 磁盘:100G  
node3 192.168.101.57 CPU:2C 内存:4G 磁盘:100G  

此文档主要以部署为主,部署的时候遇到很多问题,忘做记录了。

一、环境配置(三台机器同样操作)

如果关闭防火墙那就算了。否则需要配置以下策略。

1、firewall每台机器加一条策略[[email protected] home]# firewall-cmd --permanent --add-rich-rule="rule family="ipv4" source address="192.168.101.1/24" accept"# 此条作用就是打通101网段允许访问

查看防火墙:[[email protected] home]# firewall-cmd --list-all
public (active)
  target: default
  icmp-block-inversion: no
  interfaces: eth0
  sources:
  services: ssh dhcpv6-client
  ports:
  protocols:
  masquerade: no
  forward-ports:
  source-ports:
  icmp-blocks:
  rich rules:
        rule family="ipv4" source address="192.168.101.1/24" accept

注:为了部署不出问题,最好telnet测试一直是否生效可用。

2、关闭selinux

[[email protected] home]# vim /etc/sysconfig/selinux
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of three values:
# targeted - Targeted processes are protected,
# minimum - Modification of targeted policy. Only selected processes are protected.
# mls - Multi Level Security protection.
SELINUXTYPE=targeted

3、设置JAVA环境变量

[[email protected] home]# mkdir /home/jdk  #此处是个人习惯,我喜欢放到/home下

[[email protected] home]# tar xf jdk-8u171-linux-x64.tar.gz -C /home/jdk/

[[email protected] home]# vim /etc/profile
...在最下面加入这行

export JAVA_HOME=/home/jdk/jdk1.8.0_171
export JRE_HOME=/home/jdk/jdk1.8.0_171/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$PATH

[[email protected] home]# source /etc/profile

# 检查环境变量是否生效
[[email protected] opt]# java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)

4、加两条系统优化参数(因为后面ES服务会用到,所以别说那么多加上吧。)

[[email protected] opt]# vim /etc/sysctl.conf
vm.max_map_count=262144
[[email protected] opt]# vim /etc/security/limits.conf

...最下面加
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536

二、Kafka+Zookeeper集群部署

1、Zookeeper[[email protected] opt]# mkdir /home/zookeeper[[email protected] opt]# tar xf zookeeper-3.4.10.tar.gz -C /home/zookeeper/ && cd /home/zookeeper/zookeeper-3.4.10[[email protected] zookeeper-3.4.10]# vim conf/zoo.cfg

  tickTime=2000
  initLimit=10
  syncLimit=10
  dataDir=/home/zookeeper/data
  dataLogDir=/home/zookeeper/log
  clientPort=2181
  server.1=192.168.101.55:2888:3888
  server.2=192.168.101.56:2888:3888
  server.3=192.168.101.57:2888:3888

 注:echo "1" > /home/zookeeper/data/myid 三台机器上必须都要创建myid文件。看着点,1~3节点ID是不一样的(按上面配置server.*去每台机器做配置)

# 批量拷贝文件到各节点
   [[email protected] zookeeper-3.4.10]# for i in {55, 56, 57};do scp conf/zoo.cfg [email protected]$i:/home/zookeeper/conf/ ;done

重要事说三遍:每台机器都要做myid

 启动三台zookeeper服务

 # 报什么先不用管,启动完在讲 
 [[email protected] zookeeper-3.4.10]# bin/zkServer.sh start

 # 每台机器都执行一下,总会有一个leader(无报错则启动完成。有报错先看日志。。。日志。。。日志)
 [[email protected] zookeeper-3.4.10]# bin/zkServer.sh status
 ZooKeeper JMX enabled by default
 Using config: /home/zookeeper/zookeeper-3.4.10/bin/../conf/zoo.cfg
 Mode: follower

 2、kafka
 [[email protected] opt]# mkdir /home/kafka
  # 先备份
 [[email protected] opt]# tar xf kafka_2.12-0.10.2.1.tgz -C /home/kafka/ && cd /home/kafka/kafka_2.12-0.10.2.1/

  #每个节点都要改(标红的哈),别忘了。
  broker.id=1
  delete.topic.enable=true
  listeners=PLAINTEXT://192.168.101.55:9092
  num.network.threads=4
  num.io.threads=8
  socket.send.buffer.bytes=102400
  socket.receive.buffer.bytes=102400
  socket.request.max.bytes=104857600
  log.dirs=/home/kafka/kafka-logs
  num.partitions=3
  num.recovery.threads.per.data.dir=1
  log.retention.hours=168
  log.segment.bytes=1073741824
  log.retention.check.interval.ms=300000
  zookeeper.connect=192.168.101.55:2181,192.168.101.56:2181,192.168.101.57:2181
  zookeeper.connection.timeout.ms=6000 

 # 是时候启动了
 [[email protected] kafka_2.12-0.10.2.1]# nohup bin/kafka-server-start.sh config/server.properties &

 # 创建一个topic测试
 bin/kafka-topics.sh --create --topic tg_system_log --zookeeper 192.168.101.55:2181,192.168.101.56:2181,192.168.101.57:2181 --partitions 3 --replication-factor 1

 # 创建一个生产者

 bin/kafka-console-producer.sh --broker-list 192.169.101.57:9092 --topic tg_system_log
 # 创建一个消费者

 bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.57:9092 --topic tg_system_log

 注:生产者里发消息,消费者如果有接收那这个架构也部署完成了。(有问题请先看日志。。。日志。。。日志)

三、配置Logstash(数据采集)

1、检查安装包(两个包必须都要有)

[[email protected] kafka_2.12-0.10.2.1]# tar xf logstash-6.2.3.tar.gz -C /home && cd /home

# 创建此采集文件,本次案例采集的message和docker日志(注意标红点)
[[email protected] logstash-6.2.3]# vim conf/system_up.conf

input {
  file {
    path => "/var/log/messages"
    start_position => "beginning"
    type => "system-log"
    discover_interval => 2
  }
  file {
    path => "/var/lib/docker/containers/*/*-json.log"
    start_position => "beginning"
    type => "docker-log"
    discover_interval => 2
  }
}
output {
  if [type] == "system-log" {
    kafka {
      bootstrap_servers => "192.168.101.55:9092"
      topic_id => "tg_system_log"
      compression_type => "snappy"
    }
  }
  else if [type] == "docker-log" {
    kafka {
      bootstrap_servers => "192.168.101.55:9092"
      topic_id => "tg_docker_log"
      compression_type => "snappy"
    }
  }
}

[[email protected] kafka_2.12-0.10.2.1]# systemctl start rsyslog.service

# 功能测试

[[email protected] kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.55:9092 --topic tg_system_log

# 为了快捷手动创建日志

向/var/log/message里插数据,看topic里是否有数据,如果有则配置成功。

四、配置ES(node1~node3都要配置)

注:创建一个普通用户,把包放到该用户下。1、切记需要安装x-pack

[[email protected] es]$ elasticsearch-6.2.3/bin/elasticsearch-plugin install file:///home/cube/es/x-pack-6.2.3.zip (三台机器要安装)


[[email protected] es]$ vim elasticsearch-6.2.3/config/elasticsearch.yml

cluster.name: master-cluster
node.name: node1 (三台机器要改动)
node.master: true
node.data: true
path.data: /home/cube/es/elasticsearch-6.2.3/data
path.logs: /home/cube/es/elasticsearch-6.2.3/log
network.host: 192.168.101.55 (三台机器要改动)
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.101.55", "192.168.101.57", "192.168.101.57"]

# 选举时需要的节点连接数
discovery.zen.minimum_master_nodes: 2
# 一个节点多久ping一次,默认1s
discovery.zen.fd.ping_interval: 1s
# 等待ping返回时间,默认30s
discovery.zen.fd.ping_timeout: 10s
# ping超时重试次数,默认3次
discovery.zen.fd.ping_retries: 3

 2、启动检测(三台都要启动)

[[email protected] elasticsearch-6.2.3]$ bin/elasticsearch -d

# 查看master-cluster.log日志,无报错则启动无问题

 3、设置密码

[[email protected] ~]$ elasticsearch-6.2.4/bin/x-pack/setup-passwords interactive

五、配置kibana

 1、安装x-pack

[[email protected] kibana]# bin/kibana-plugin install file:///home/kibana/x-pack-6.2.3.zip

 2、修改配置kibana.yml

[[email protected] kibana]# vim config/kibana.yml

server.port: 5601
server.host: "192.168.101.55"
elasticsearch.url: "http://192.168.101.55:9200"
elasticsearch.username: "elastic"
elasticsearch.password: "elastic"

 3、启动kibana

[[email protected] kibana]# bin/kibana

# 浏览器打开URL:http://192.168.101.55:5601

# 登录后找Monitoring>>Nodes:3可以看到ES的节点数。

六、配置Logstash(数据整合中间件)

1、创建conf目录,然后在里面创建kafka_to_es.conf文件[[email protected] logstash-6.2.3]# vim conf/kafka_to_es.conf

input {
  kafka {
    bootstrap_servers => ["192.168.101.55:9082"]
    topics => ["tg_system_log"]
    codec => "json"
    type => "system_log"
    consumer_threads => 5
    decorate_events => true
  }
  kafka {
    bootstrap_servers => ["192.168.101.55:9082"]
    topics => ["tg_docker_log"]
    codec => "json"
    type => "docker_log"
    consumer_threads => 5
    decorate_events => true
  }
}
output {
  if [type] == "system_log"{
    elasticsearch {
    hosts => ["192.168.101.55:9200","192.168.101.56:9200","192.168.101.56:9200"]
    index => "systems-logs-%{+YYY.MM.dd}"
    user => elastic
    password => elastic
    }
  }
  else if [type] == "docker_log" {
    elasticsearch {
    hosts => ["192.168.101.55:9200","192.168.101.56:9200","192.168.101.56:9200"]
    index => "dockers-logs-%{+YYY.MM.dd}"
    user => elastic
    password => elastic
   }
  }
}

 这里直接启动logstash即可

七、打开kibana页面

点开Management>>index Patterns创建一个新的Index这里会出现中间件output的index配置名字。直接创建index即可。到此配置已完在。

补充内容:

本来想着用fluentd把docker输出日志传到kafka,但是没成功这里直接传到ES,后续在研究吧。或许有其他大神完成也可以分享一下文档我学习一下。

1、配置fluentd服务

[[email protected] ~]# rpm -qa | grep td-agent
td-agent-3.4.0-0.el7.x86_64

 2、需要先安装fluent-plugin-elasticsearch(更新ruby2.5 看下面文献)

[[email protected] ~]# gem install fluent-plugin-elasticsearch

[[email protected] ~]# vim /etc/td-agent/td-agent.conf

<source>
@type debug_agent
@id input_debug_agent
bind 127.0.0.1
port 24230
</source>
<match docker.**>
type stdout
</match>
<match nginx-test.**>
type elasticsearch
host 192.168.101.55
port 9200
user elastic
password elastic
logstash_format true
logstash_prefix docker
logstash_dateformat %Y_%m
index_name docker_log
flush_interval 5s
type_name docker
include_tag_key true
</match>

3、启动docker

docker run -d --log-driver fluentd --log-opt fluentd-address=localhost:24224 --log-opt tag="nginx-test" --log-opt fluentd-async-connect --name nginx-test -p 9080:80 nginx

其他按第七步操作。

更新ruby看:https://blog.csdn.net/qq_26440803/article/details/82717244

其他文献:https://blog.csdn.net/qq_26440803/article/details/82717244

原文地址:https://www.cnblogs.com/TaleG/p/10863585.html

时间: 2024-08-30 05:05:06

Kafka+ELK完成日志采集处理的相关文章

2018年ElasticSearch6.2.2教程ELK搭建日志采集分析系统(教程详情)

章节一 2018年 ELK课程计划和效果演示1.课程安排和效果演示简介:课程介绍和主要知识点说明,ES搜索接口演示,部署的ELK项目演示es: localhost:9200kibana http://localhost:5601/ 章节二 elasticSearch 6.2版本基础讲解到阿里云部署实战 2.搜索引擎知识介绍和相关框架简介:介绍搜索的基本概念,市面上主流的搜索框架elasticSearch和solr等对比什么是搜索:在海量信息中获取我们想要的信息传统做法:1.文档中使用系统的Fin

2018年ElasticSearch6.2.2教程ELK搭建日志采集分析系统(目录)

章节一  2018年 ELK课程计划和效果演示 1.课程安排和效果演示 简介:课程介绍和主要知识点说明,ES搜索接口演示,部署的ELK项目演示 章节二 elasticSearch 6.2版本基础讲解到阿里云部署实战 2.搜索引擎知识介绍和相关框架 简介:介绍搜索的基本概念,市面上主流的搜索框架elasticSearch和solr等对比 什么是搜索:在海量信息中获取我们想要的信息 3.新版本 elasticSearch 6.1.2介绍 简介:介绍ES的主要特点和使用场景,新特性讲解 4.windo

windows下kafka+ELK的日志系统

用到的软件:zookeeper.kafka.logstash(6.3.2版本).ES(6.3.2版本).Kibana(6.3.2版本).具体安装步骤不在此说明,基本都是下载解压,改一下配置文件,即可使用.(以下所述均在Windows下)1.zookeeper:kafka中自带zookeeper,可以不用装zookeeper,如果想自己另装,需配置环境变量,如下:ZOOKEEPER_HOME => D:\nomalAPP\zookeeper-3.4.13path 里面加入 %ZOOKEEPER_H

基于Flume+LOG4J+Kafka的日志采集架构方案

本文将会介绍如何使用 Flume.log4j.Kafka进行规范的日志采集. Flume 基本概念 Flume是一个完善.强大的日志采集工具,关于它的配置,在网上有很多现成的例子和资料,这里仅做简单说明不再详细赘述.Flume包含Source.Channel.Sink三个最基本的概念: Source——日志来源,其中包括:Avro Source.Thrift Source.Exec Source.JMS Source.Spooling Directory Source.Kafka Source.

使用Nginx和Logstash以及kafka来实现网站日志采集的详细步骤和过程

使用Nginx和Logstash以及kafka来实现网站日志采集的详细步骤和过程 环境介绍: linux虚拟机3台,主机名分别为hadoop01.hadoop02和hadoop03; 在这3台虚拟机上分别部署了3个Zookeeper,这里Zookeeper的具体安装步骤不做介绍; 在这3台虚拟机上分别部署了3个kafka,这里kafka的具体安装步骤也不做介绍; 我们在hadoop02这台机器上安装一个Logstash,其安装过程非常简单,解压既可使用; ====================

Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日志采集方案

前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产生协同作用,而且 logstash比filebeat功能更强大一点,2个都使用是因为:Beats 是一个轻量级的采集器,支持从边缘机器向 Logstash 和 Elasticsearch 发送数据.考虑到 Logstash 占用系 统资源较多,我们采用 Filebeat 来作为我们的日志采集器.并且

基于Kafka+ELK搭建海量日志平台

早在传统的单体应用时代,查看日志大都通过SSH客户端登服务器去看,使用较多的命令就是 less 或者 tail.如果服务部署了好几台,就要分别登录到这几台机器上看,等到了分布式和微服务架构流行时代,一个从APP或H5发起的请求除了需要登陆服务器去排查日志,往往还会经过MQ和RPC调用远程到了别的主机继续处理,开发人员定位问题可能还需要根据TraceID或者业务唯一主键去跟踪服务的链路日志,基于传统SSH方式登陆主机查看日志的方式就像图中排查线路的工人一样困难,线上服务器几十上百之多,出了问题难以

Kafka+Zookeeper+Filebeat+ELK 搭建日志收集系统

ELK ELK目前主流的一种日志系统,过多的就不多介绍了 Filebeat收集日志,将收集的日志输出到kafka,避免网络问题丢失信息 kafka接收到日志消息后直接消费到Logstash Logstash将从kafka中的日志发往elasticsearch Kibana对elasticsearch中的日志数据进行展示 image 环境介绍: 软件版本: - Centos 7.4 - java 1.8.0_45 - Elasticsearch 6.4.0 - Logstash 6.4.0 - F

海量日志下的日志架构优化:filebeat+logstash+kafka+ELK

前言: 实验需求说明 在前面的日志收集中,都是使用的filebeat+ELK的日志架构.但是如果业务每天会产生海量的日志,就有可能引发logstash和elasticsearch的性能瓶颈问题.因此改善这一问题的方法就是filebeat+logstash+kafka+ELK,也就是将存储从elasticsearch转移给消息中间件,减少海量数据引起的宕机,降低elasticsearch的压力,这里的elasticsearch主要进行数据的分析处理,然后交给kibana进行界面展示 实验架构图: