ELK 之数据收集传输过滤 Filebeat+Logstash 部署

本文与前文是有关联的,之前的两篇文章客官可以抬腿出门右转 导读ELK 之前端ELK 之分布式发
#前端和消息队列搞定之后,我们需要安装数据采集工具filebeats和数据过滤机运输工具Logstash,一般情况我们都使用filebeats 用来收集日志文件,我自定义了一个log文件,文件内容如下:
55.3.244.1 GET /index.html 15824 0.043
55.3.244.1 GET /index.html 15824 0.043
文件位置放到/tmp/test.log

cd /opt/elk/filebeat-6.2.2-linux-x86_64
#filebeat 的配置文件大体上分为两部分,输入部分和输出部分,输入指定输入源即可,输出可以选择直接到elasticsearch,logstash或者是消息队列(redis&kafka)等,本文采用kafka 作为消息队列;
vi filebeat.yml
filebeat.prospectors:
#日志类型

  • type: log
    enabled: True

    日志路径可以

    写多个,支持通配符
    paths:

    • /tmp/test.log
      #设置字符集编码
      encoding: utf-8
      #文档类型
      document_type: my-nginx-log
      #每十秒扫描一次
      scan_frequency: 10s

      实际读取文件时,每次读取 16384 字节

      harverster_buffer_size: 16384
      #The maximum number of bytes that a single log message can have. All bytes after max_bytes are discarded and not sent. This setting is especially useful for multiline log messages, which can get large. The default is 10MB (10485760). 一次发生的log大小值;
      max_bytes: 10485760

      是否从文件末尾开始读取

      tail_files: true
      #给日志加上tags方便在logstash过滤日志的时候做判断
      tags: ["nginx-access"]
      #剔除以.gz 结尾的文件
      exclude_files: [".gz$"]
      #output 部分的配置文档
      #配置输出为kafka,无论你是tar包还是rpm安装,目录里边会看到官方提供的filebeat.full.yml OR filebeat.reference.yml 这里边有filebeat 所有的input 方法和output 方法供你参考;
      output.kafka:
      enabled: true
      #kafka 的server,可以配置集群,例子如下:
      hosts:["ip:9092","ip2:9092","ip3:9092"]
      #这个非常重要,filebeat作为provider,把数据输入到kafka里边,logstash 作为消费者去消费这些信息,logstash的input 中需要这个topic,不然logstash没有办法取到数据。
      topic: elk-%{[type]}

      The number of concurrent load-balanced Kafka output workers. kafka 的并发运行进程

      worker: 2
      #当传输给kafka 有问题的时候,重试的次数;
      max_retries: 3
      #单个kafka请求里面的最大事件数,默认2048
      bulk_max_size: 2048
      #等待kafka broker响应的时间,默认30s
      timeout: 30s
      #kafka broker等待请求的最大时长,默认10s
      broker_timeout: 10s
      #每个kafka broker在输出管道中的消息缓存数,默认256
      channel_buffer_size: 256
      #网络连接的保活时间,默认为0,不开启保活机制
      keep_alive: 60
      #输出压缩码,可选项有none, snappy, lz4 and gzip,默认为gzip (kafka支持的压缩,数据会先被压缩,然后被生产者发送,并且在服务端也是保持压缩状态,只有在最终的消费者端才会被解压缩)
      compression: gzip
      #允许的最大json消息大小,默认为1000000,超出的会被丢弃,应该小于broker的??message.max.bytes(broker能接收消息的最大字节数)
      max_message_bytes: 1000000
      #kafka的响应返回值,0位无等待响应返回,继续发送下一条消息;1表示等待本地提交(leader broker已经成功写入,但follower未写入),-1表示等待所有副本的提交,默认为1
      required_acks: 0
      #The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats"。客户端ID 用于日志怕错,审计等,默认是beats。
      client_id: beats
      #测试配置文件:/opt/elk/filebeat/filebeat -c /opt/elk/filebeat/filebeat.yml test config
      #如果配置文件没有问题的话,会出现config ok ,如果有问题会提示具体问题在哪里。
      #启动filebeat
      可以先通过 /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c -e /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml 查看一下输入filebeat是否工作正常,会有很多信息打印到屏幕上;
      nohup /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat -c /opt/elk/filebeat-6.2.2-linux-x86_64/filebeat.yml >>/dev/null 2>&1&

logstash 安装配置:
input {
#数据来源
kafka {
#这个对应filebeat的output 的index
topics_pattern => "elk-.*"
#kafka 的配置
bootstrap_servers => "IP1:9092,IP2:9092,IP3:9092"
kafka 中的group ID
group_id => "logstash-g1"
}
}
#过滤器,如果你想要日志按照你的需求输出的话,需要在logstash中过滤并且给与相应的key,比如以下的是nginx 日志,用的是logstash内置好的过滤器,%{IP:client} 这个里边包含了两个信息:ip地址,client 是在kibana 中显示的自定义键值,最终显示成这样,http://grokdebug.herokuapp.com/ 是在线调试logstash filter的工具,你可以在线调试你的过滤规则。

filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"}
#因为我们取出了字段,所以不需要原来这个message字段,这个字段里边包含之前beat 输入的所有字段。
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["IP:9200"]
#这个是在kibana上的index Patterns 的索引,建议什么服务就用干什么名字,因为beat 提供了些kibana的模板可以导入,导入的模板匹配的时候用的索引是对应服务的名称开头 。
index => "nginx-%{+YYYY.MM.dd}"
document_type => "nginx"
#每次20000 发送一次数据到elasticsearch
flush_size => 20000
如果不够20000,没10秒会发送一次数据;、
idle_flush_time =>10
}
}

日志从进入到输出的流程图:

Kibana 上边的日志通过logstash 过滤之后取出来在kibana上显示如下:

原文地址:http://blog.51cto.com/seekerwolf/2106509

时间: 2024-08-28 20:55:12

ELK 之数据收集传输过滤 Filebeat+Logstash 部署的相关文章

海量日志下的日志架构优化:filebeat+logstash+kafka+ELK

前言: 实验需求说明 在前面的日志收集中,都是使用的filebeat+ELK的日志架构.但是如果业务每天会产生海量的日志,就有可能引发logstash和elasticsearch的性能瓶颈问题.因此改善这一问题的方法就是filebeat+logstash+kafka+ELK,也就是将存储从elasticsearch转移给消息中间件,减少海量数据引起的宕机,降低elasticsearch的压力,这里的elasticsearch主要进行数据的分析处理,然后交给kibana进行界面展示 实验架构图:

ELK 做日志分析(filebeat+logstash+elasticsearch)配置

利用 Filebeat去读取日志发送到 Logstash ,再由 Logstash 处理后发送给 Elasticsearch . 一.Filebeat 项目日志文件: 利用 Filebeat 去读取文件,paths 下面配置路径地址,Filebeat 会自动去读取 /data/share/business_log/TA-*/debug.log 文件 #=========================== Filebeat prospectors ========================

ELK架构下利用Kafka Group实现Logstash的高可用

系统运维的过程中,每一个细节都值得我们关注 下图为我们的基本日志处理架构 所有日志由Rsyslog或者Filebeat收集,然后传输给Kafka,Logstash作为Consumer消费Kafka里边的数据,分别写入Elasticsearch和Hadoop,最后使用Kibana输出到web端供相关人员查看,或者是由Spark接手进入更深层次的分析 在以上整个架构中,核心的几个组件Kafka.Elasticsearch.Hadoop天生支持高可用,唯独Logstash是不支持的,用单个Logsta

ELK:日志收集分析平台

目录 简介 环境说明 Filebeat 部署 web上采集配置文件 app上采集配置文件 Redis 部署 配置文件 Logstash 部署 Elasticsearch 集群部署 配置文件 Kibana 部署 参考文档 简介 ELK是一个日志收集分析的平台,它能收集海量的日志,并将其根据字段切割.一来方便供开发查看日志,定位问题:二来可以根据日志进行统计分析,通过其强大的呈现能力,挖掘数据的潜在价值,分析重要指标的趋势和分布等,能够规避灾难和指导决策等.ELK是Elasticsearch公司出品

ELK日志分析收集

日志的作用:系统运维和开发人员可以通过日志了解服务器软硬件,检查配置过程当中发生的错误和错误原因.以便了解服务器的负荷,性能的安全性,从而及时的采取措施纠正错误. 日志的功能:解决系统故障和发现问题的主要手段. 日志主要包括:系统日志,应用程序日志,安全日志. 集中化管理日志.对于日志的统计和检索是一件比较麻烦的事情.一般使用grep ,awk和wc等Linux命令去实现检索和统计.但是对于查询,排序和统计,就显得有些力不存心.而ELK正好能解决上述问题 ELK是开源分布式的搜索的引擎.它是由E

网站统计中的数据收集原理及实现

转载自:http://blog.sina.com.cn/s/blog_62b832910102w5mx.html Avinash Kaushik将点击流数据的获取方式分为4种:log files.web beacons.JavaScript tags和packet sniffers,其中包嗅探器(packet sniffers)比较不常见,最传统的获取方式是通过WEB日志文件(log files):而beacons和JavaScript是目前较为流行的方式,Google Analytics目前就

万能日志数据收集器 Fluentd - 每天5分钟玩转 Docker 容器技术(91)

前面的 ELK 中我们是用 Filebeat 收集 Docker 容器的日志,利用的是 Docker 默认的 logging driver json-file,本节我们将使用 fluentd 来收集容器的日志. Fluentd 是一个开源的数据收集器,它目前有超过 500 种的 plugin,可以连接各种数据源和数据输出组件.在接下来的实践中,Fluentd 会负责收集容器日志,然后发送给 Elasticsearch.日志处理流程如下: 这里我们用 Filebeat 将 Fluentd 收集到的

filebeat + logstash + elasticsearch + granfa

filebeat + logstash + elasticsearch + granfa https://www.cnblogs.com/wenchengxiaopenyou/p/9034213.html 一.背景 前端web服务器为nginx,采用filebeat + logstash + elasticsearch + granfa 进行数据采集与展示,对客户端ip进行地域统计,监控服务器响应时间等. 二.业务整体架构: nginx日志落地-->filebear-->logstash--&

网络TCp数据的传输设计(黏包处理)

//1.该片为引用别人的文章:http://www.cnblogs.com/alon/archive/2009/04/16/1437599.html 解决TCP网络传输"粘包"问题 解决TCP网络传输"粘包"问题 作者:杨小平 王胜开 原文出处:http://www.ciw.com.cn/ 当前在网络传输应用中,广泛采用的是TCP/IP通信协议及其标准的socket应用开发编程接口(API).TCP/IP传输层有两个并列的协议:TCP和UDP.其中TCP(trans