logstash通过kafka传输nginx日志(三)

  单个进程 logstash 可以实现对数据的读取、解析和输出处理。但是在生产环境中,从每台应用服务器运行 logstash 进程并将数据直接发送到 Elasticsearch 里,显然不是第一选择:第一,过多的客户端连接对 Elasticsearch 是一种额外的压力;第二,网络抖动会影响到 logstash 进程,进而影响生产应用;第三,运维人员未必愿意在生产服务器上部署 Java,或者让 logstash 跟业务代码争夺 Java 资源。

  所以,在实际运用中,logstash 进程会被分为两个不同的角色。运行在应用服务器上的,尽量减轻运行压力,只做读取和转发,这个角色叫做 shipper;运行在独立服务器上,完成数据解析处理,负责写入 Elasticsearch 的角色,叫 indexer。

  Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。和Redis做轻量级消息队列不同,Kafka利用磁盘做消息队列,所以也就无所谓消息缓冲时的磁盘问题。生产环境中还是推荐使用Kafka做消息队列。此外,如果公司内部已经有 Kafka 服务在运行,logstash也可以快速接入,免去重复建设的麻烦。

一、Logstash搭建

  详细搭建可以参考Logstash安装搭建(一)

二、配置Shipper

  Shipper 即为Nginx服务器上运行的 logstash 进程,logstash 通过 logstash-input-file 写入,然后通过 logstash-output-kafka 插件将日志写入到 kafka 集群中。

  Logstash使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录在 .sincedb 的数据库文件来跟踪被监听的日志文件的当前读取位置。

.sincedb 文件中记录了每个被监听的文件的 inode, major number, minor number 和 pos。 

  Input配置实例

input {
  file {
    path => "/var/log/nginx/log_access.log"
    type => "nginx-access"
    discover_interval => 15    #logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒。
    sincedb_path => "/etc/logstash/.sincedb"    #定义sincedb文件的位置
    start_position => "beginning"    #定义文件读取的位置
  }
}

  其他配置详解:

exclude    不想被监听的文件可以排除出去。
close_older    已经监听的文件,若超过这个时间内没有更新,就关闭监听该文件的句柄。默认为:3600s,即一小时。
ignore_older    在每次检查文件列表时,若文件的最后修改时间超过该值,则忽略该文件。默认为:86400s,即一天。
sincedb_path    定义 .sincedb 文件路径,默认为 $HOME/.sincedb 。
sincedb_write_interval    间隔多久写一次sincedb文件,默认15s。
stat_interval    每隔多久检查被监听文件状态(是否有更新),默认为1s。
start_position    logstash从什么位置开始读取文件数据。默认为结束位置,类似 tail -f 的形式。设置为“beginning”,则从头读取,类似 cat ,到最后一行以后变成为 tail -f 。

  Output配置实例

  以下配置可以实现对 kafka producer 的基本使用。生产者更多详细配置请查看 Kafka 官方文档中生产者部分配置文档

output {
  kafka {
    bootstrap_servers => "localhost:9092"    #生产者
    topic_id => "nginx-access-log"    #设置写入kafka的topic
    compression_type => "snappy"    #消息压缩模式,默认是none,可选gzip、snappy。
  }
}

  logstash-out-kafka 其他配置详解:

compression_type    消息压缩模式,默认是none,有效值为:none、gzip、snappy。
asks    消息确认模式,默认为1,有效值为:0、1、all。设置为0,生产者不等待 broker 回应;设置为1,生产者会收到 leader 写入之后的回应;设置为all, leader 将要等待 in-sync 中所有的 replication 同步确认。
send_buffer_bytes    TCP发送数据时的缓冲区的大小。

  logstash-kafka 插件输入和输出默认 codec 为 json 格式。在输入和输出的时候注意下编码格式。消息传递过程中 logstash 默认会为消息编码内加入相应的时间戳和 hostname 等信息。如果不想要以上信息(一般做消息转发的情况下),可以使用以下配置,例如:

 output {
    kafka {
        codec => plain {
            format => "%{message}"
        }
    }
}

三、搭建配置Kafka

  搭建配置Kafka可以参考 Kafka集群搭建

四、配置Indexer

  是用logstash-input-kafka插件,从kafka集群中读取数据。

  Input配置示例:

input {
  kafka {
    zk_connect => "localhost:2181"    #zookeeper地址
    topic_id => "nginx-access-log"    #kafka中topic名称,记得创建该topic
    group_id => "nginx-access-log"     #默认为“logstash”
    codec => "plain"    #与Shipper端output配置项一致
    consumer_threads => 1    #消费的线程数
    decorate_events => true    #在输出消息的时候回输出自身的信息,包括:消费消息的大小、topic来源以及consumer的group信息。
    type => "nginx-access-log"
  }
}

  更多 logstash-input-kafka 配置可以从 logstash 官方文档 查看。

  Logstash 是一个 input | decode | filter | encode | output 的数据流。上述配置中有 codec => "plain" ,即logstash 采用转发的形式,不会对原有信息进行编码转换。丰富的过滤器插件(Filter)的存在是 logstash 威力强大的重要因素,提供的不单单是过滤的功能,可以进行复杂的逻辑处理,甚至无中生有添加新的logstash事件到后续的流程中去。这里只列举 logstash-output-elasticsearch 配置。

  logstash-output-elasticsearch 配置实例:

output {
    elasticsearch {
        hosts => ["localhost:9200"]    //Elasticsearch 地址,多个地址以逗号分隔。
        index => "logstash-%{type}-%{+YYYY.MM.dd}"    //索引命名方式,不支持大写字母(Logstash除外)
        document_type => "%{type}"    //文档类型
        workers => 1
        flush_size => 20000    //向Elasticsearch批量发送数据的条数
        idle_flush_time => 10    //向Elasticsearch批量发送数据的时间间隔,即使不满足 flush_size 也会发送
        template_overwrite => true    //设置为true,将会把自定义的模板覆盖logstash自带模板
    }
}

到此就已经把Nginx上的日志转发到Elasticsearch中。

时间: 2024-11-08 22:37:14

logstash通过kafka传输nginx日志(三)的相关文章

logstash将Kafka中的日志数据订阅到HDFS

前言:通常情况下,我们将Kafka的日志数据通过logstash订阅输出到ES,然后用Kibana来做可视化分析,这就是我们通常用的ELK日志分析模式.但是基于ELK的日志分析,通常比较常用的是实时分析,日志存个十天半个月都会删掉.那么在一些情况下,我需要将日志数据也存一份到我HDFS,积累到比较久的时间做半年.一年甚至更长时间的大数据分析.下面就来说如何最简单的通过logstash将kafka中的数据订阅一份到hdfs. 一:安装logstash(下载tar包安装也行,我直接yum装了) #y

ELK之logstash系统日志和nginx日志收集-4

logstash常用参数 1 path 是必须的选项,每一个file配置,都至少有一个path 2 exclude 是不想监听的文件,logstash会自动忽略该文件的监听.配置的规则与path类似,支持字符串或者数组,但是要求必须是绝对路径. 3 start_position 是监听的位置,默认是end,即一个文件如果没有记录它的读取信息,则从文件的末尾开始读取,也就是说,仅仅读取新添加的内容.对于一些更新的日志类型的监听,通常直接使用end就可以了:相反,beginning就会从一个文件的头

ELK实践(二):收集Nginx日志

Nginx访问日志 这里补充下Nginx访问日志使用的说明.一般在nginx.conf主配置文件里需要定义一种格式: log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for&qu

使用Nginx和Logstash以及kafka来实现网站日志采集的详细步骤和过程

使用Nginx和Logstash以及kafka来实现网站日志采集的详细步骤和过程 环境介绍: linux虚拟机3台,主机名分别为hadoop01.hadoop02和hadoop03; 在这3台虚拟机上分别部署了3个Zookeeper,这里Zookeeper的具体安装步骤不做介绍; 在这3台虚拟机上分别部署了3个kafka,这里kafka的具体安装步骤也不做介绍; 我们在hadoop02这台机器上安装一个Logstash,其安装过程非常简单,解压既可使用; ====================

ELK之八----Logstash结合kafka收集系统日志和nginx日志

一.logstash结合kafka收集系统日志和nginx日志 架构图: 环境准备: A主机:kibana.elasticsearch,有条件可以将两个服务器分开:192.168.7.100 B主机:logstash主机:192.168.7.101/nginx服务器也在此主机上 C主机:logstash主机:192.168.7.102 D主机:kafka/zookeeper:192.168.7.104 E主机:kafka/zookeeper:192.168.7.105 1.使用logstash-

ELK+kafka收集 Nginx与tomcat日志

ELK日志收集 ELK原理与介绍为什么用到ELK:一般我们需要进行日志分析场景:直接在日志文件中 grep.awk 就可以获得自己想要的信息.但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档.文本搜索太慢怎么办.如何多维度查询.需要集中化的日志管理,所有服务器上的日志收集汇总.常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问.一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定

logstash实现nginx日志status报警

elk搭建方法详见之前一篇文章http://blog.51cto.com/chentianwang/1915326废话不多说 环境介绍192.168.102.20 nginx logstash-agent6.1192.168.102.21 logstash-server6.1 redis-server 一,搭建测试的nginx环境配置文件如下 worker_processes 1; events { worker_connections 1024; } http { include mime.t

ElKstack-解决nginx日志url链接包含中文logstash报错问题

logstash报错现象 Trouble parsing json {:source=>"message", :raw=>"{\"@timestamp\":\"2016-05-30T14:51:27+08:00\",\"host\":\"10.139.48.166\",\"clientip\":\"180.109.110.203\",\"

logstash通过rsyslog对nginx的日志收集和分析

logstash通过rsyslog对nginx的日志收集和分析 http://bbotte.blog.51cto.com/6205307/1613571 logstash&elasticsearch&kibana的安装和配置 http://bbotte.blog.51cto.com/6205307/1614453  这一篇文章里面是以nginx打补丁的方式实现rsyslog把nginx的日志同步到logstash做分析,不过线上环境种种不一样,下面是把nginx的日志直接通过rsyslog