filebeat-2-通过kafka队列链接logstash

filebeat 直接到logstash, 由于logstash的设计问题, 可能会出现阻塞问题, 因为中间使用消息队列分开

可以使用redis, 或者kafka, 这儿使用的是kafka

1, 安装

kafka的安装, 解压可用, 但需要zookeeper, 内置了一个zookeeper, 直接使用即可

1), 启动内置zookeeper

./bin/zookeeper-server-start.sh ./config/zookeeper.properties &

2), 修改kafka的配置文件

vim ./conf/server.properties

############################# Server Basics #############################
broker.id=0
delete.topic.enable=true

############################# Socket Server Settings #############################
listeners=PLAINTEXT://0.0.0.0:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################
log.flush.interval.messages=10000
log.flush.interval.ms=1000

############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

3), 启动kafkaserver

/bin/kafka-server-start.sh ./config/server.properties &

4),修改filebeat文件, 最终形态

cat ./elk/filebeat-5.5.2-linux-x86_64/filebeat.yml | grep -v ‘#‘ | grep -v ‘^$‘
filebeat.prospectors:
- input_type: log
  paths:
    - /var/log/nginx/*.log
  encoding: utf-8
  document_type: my-nginx-log
  scan_frequency: 5s
  harvester_buffer_size: 16384
  max_bytes: 10485760
  tail_files: true
output.kafka:
  enabled: true
  hosts: ["www.wenbronk.com:9092"]
  topic: elk-%{[type]}
  worker: 2
  max_retries: 3
  bulk_max_size: 2048
  timeout: 30s
  broker_timeout: 10s
  channel_buffer_size: 256
  keep_alive: 60
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 0
  client_id: beats

5), 重新启动filebeat

./filebeat -c ./filebeat.yml &

6), 修改 logstash的input

input {
    kafka  {
      #codec => "json"
      topics_pattern => "elk-.*"
      bootstrap_servers => "127.0.0.1:9092"
      auto_offset_reset => "latest"
      group_id => "logstash-g1"
    }
}
output {
    elasticsearch {                                  #Logstash输出到elasticsearch;
      hosts => ["localhost:9200"]                    #elasticsearch为本地;
      index => "logstash-nginx-%{+YYYY.MM.dd}"       #创建索引;
      document_type => "nginx"                       #文档类型;
      workers => 1                                   #进程数量;
      user => elastic                                #elasticsearch的用户;
      password => changeme                           #elasticsearch的密码;
      flush_size => 20000
      idle_flush_time => 10
 }
}

7), 重启logstash

8 ), 页面访问 nginx, 可以查看消息队列中的消息

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic elk-log -m-beginning

参考: http://www.ywnds.com/?p=9776

时间: 2024-08-24 13:09:21

filebeat-2-通过kafka队列链接logstash的相关文章

logback KafkaAppender 写入Kafka队列,集中日志输出.

为了减少应用服务器对磁盘的读写,以及可以集中日志在一台机器上,方便使用ELK收集日志信息,所以考虑做一个jar包,让应用集中输出日志 网上搜了一圈,只发现有人写了个程序在github 地址:https://github.com/johnmpage/logback-kafka 本来打算引用一下这个jar就完事了,没想到在pom里下不下来,只好把源码下了,拷贝了代码过来,自己修改一下. 首先,安装一个Kafka,作为一个懒得出神入化得程序员,我选择的安装方式是 启动zookeeper容器 docke

ELK架构下利用Kafka Group实现Logstash的高可用

系统运维的过程中,每一个细节都值得我们关注 下图为我们的基本日志处理架构 所有日志由Rsyslog或者Filebeat收集,然后传输给Kafka,Logstash作为Consumer消费Kafka里边的数据,分别写入Elasticsearch和Hadoop,最后使用Kibana输出到web端供相关人员查看,或者是由Spark接手进入更深层次的分析 在以上整个架构中,核心的几个组件Kafka.Elasticsearch.Hadoop天生支持高可用,唯独Logstash是不支持的,用单个Logsta

消息队列 链接

NAMEmq_overview —— POSIX消息队列概述 DESCRIPTIONPOSIX消息队列允许进程以消息的形式交换数据.此API与System V消息队列(msgget(2),msgsnd(2),msgrcv(2)等)有明显不同,但做的事情差不多. 消息队列通过mq_open(3)创建和打开,此函数返回一个消息队列描述符(mqd_t),它用于之后的调用中引用打开的消息队列.每个消息队列由一个名字标识,该名字具有这样的格式/somename,亦即,一个空字符结尾,以斜线开头,最多跟着N

Logstash过滤分析日志数据/kibanaGUI调试

[Logstash] [[email protected] ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.tar.gz [[email protected] ~]# tar zxvf logstash-6.3.2.tar.gz -C /usr/local/ [[email protected] ~]# mv /usr/local/logstash-6.3.2/ /usr/local/logstash

FileBeat发送日志到logstash、ES、kafka

一.基本概念 简单概述 最近在了解ELK做日志采集相关的内容,这篇文章主要讲解通过filebeat来实现日志的收集.日志采集的工具有很多种,如fluentd, flume, logstash,betas等等.首先要知道为什么要使用filebeat呢?因为logstash是jvm跑的,资源消耗比较大,启动一个logstash就需要消耗500M左右的内存,而filebeat只需要10来M内存资源.常用的ELK日志采集方案中,大部分的做法就是将所有节点的日志内容通过filebeat送到kafka消息队

海量日志下的日志架构优化:filebeat+logstash+kafka+ELK

前言: 实验需求说明 在前面的日志收集中,都是使用的filebeat+ELK的日志架构.但是如果业务每天会产生海量的日志,就有可能引发logstash和elasticsearch的性能瓶颈问题.因此改善这一问题的方法就是filebeat+logstash+kafka+ELK,也就是将存储从elasticsearch转移给消息中间件,减少海量数据引起的宕机,降低elasticsearch的压力,这里的elasticsearch主要进行数据的分析处理,然后交给kibana进行界面展示 实验架构图:

redis代替kafka做缓存队列

前言:刚上线elk一个月左右,使用的kafka作为缓存队列,但是不知道为何,中间发生过好几次,elk突然没数据了,长达好几天都没有,      折腾了好久,好了,过几天又发生同样的状况.经查找,数据是有到达kafka,但是logstash读取不了.无奈之下,只能把kafka      更换为redis.      filebeat配置-------------filebeat:  prospectors:    -      document_type: "web-hkgf-proxy-ngin

filebeat+logstash+elasticsearch收集haproxy日志

filebeat用于是日志收集,感觉和 flume相同,但是用go开发,性能比较好 在2.4版本中, 客户机部署logstash收集匹配日志,传输到 kafka,在用logstash 从消息队列中抓取日志存储到elasticsearch中. 但是在 5.5版本中,使用filebeat 收集日志,减少对客户机的性能影响, filebeat 收集日志 传输到 logstash的 5044端口, logstash接收日志,然后传输到es中 实验 filebeat ---- kafka ------lo

ELK 之数据收集传输过滤 Filebeat+Logstash 部署

本文与前文是有关联的,之前的两篇文章客官可以抬腿出门右转 导读,ELK 之前端,ELK 之分布式发#前端和消息队列搞定之后,我们需要安装数据采集工具filebeats和数据过滤机运输工具Logstash,一般情况我们都使用filebeats 用来收集日志文件,我自定义了一个log文件,文件内容如下:55.3.244.1 GET /index.html 15824 0.04355.3.244.1 GET /index.html 15824 0.043文件位置放到/tmp/test.log cd /