logstash将Kafka中的日志数据订阅到HDFS

前言:通常情况下,我们将Kafka的日志数据通过logstash订阅输出到ES,然后用Kibana来做可视化分析,这就是我们通常用的ELK日志分析模式。但是基于ELK的日志分析,通常比较常用的是实时分析,日志存个十天半个月都会删掉。那么在一些情况下,我需要将日志数据也存一份到我HDFS,积累到比较久的时间做半年、一年甚至更长时间的大数据分析。下面就来说如何最简单的通过logstash将kafka中的数据订阅一份到hdfs。

一:安装logstash(下载tar包安装也行,我直接yum装了)

#yum install logstash-2.1.1

二:从github上克隆代码

#git clone  https://github.com/heqin5136/logstash-output-webhdfs-discontinued.git
#ls
logstash-output-webhdfs-discontinued

三:安装logstash-output-webhdfs插件

#cd logstash-output-webhdfs-discontinued
logstash的bin目录下有个plugin,使用plugin来安装插件
#/opt/logstash/bin/plugin install logstash-output-webhdfs

四:配置logstash

#vim /etc/logstash/conf.d/logstash.conf
input {
  kafka {
    zk_connect => ‘10.10.10.1:2181,10.10.10.2:2181,10.10.10.3:2181‘   #kafka的zk集群地址
    group_id => ‘hdfs‘                     #消费者组,不要和ELK上的消费者一样
    topic_id => ‘apiAppWebCms-topic‘       #topic 
    consumer_id => ‘logstash-consumer-10.10.8.8‘   #消费者id,自定义,我写本机ip。
    consumer_threads => 1
    queue_size => 200
    codec => ‘json‘
  }
}

output {            
#如果你一个topic中会有好几种日志,可以提取出来分开存储在hdfs上。
if [type] == "apiNginxLog" {
    webhdfs {
           workers => 2
           host => "10.10.8.1"        #hdfs的namenode地址    
           port => 50070              #webhdfs端口
           user => "hdfs"             #hdfs运行的用户啊,以这个用户的权限去写hdfs。
           path => "/data/logstash/apiNginxLog-%{+YYYY}-%{+MM}-%{+dd}/logstash-%{+HH}.log 
             #按天建目录,按小时建log文件。
           flush_size => 500
#       compression => "snappy"             #压缩格式,可以不压缩
        idle_flush_time => 10
        retry_interval => 0.5
       }
   }
if [type] == "apiAppLog" {
    webhdfs {
        workers => 2
        host => "10.64.8.1"
        port => 50070
        user => "hdfs"
        path => "/data/logstash/api/apiAppLog-%{+YYYY}-%{+MM}-%{+dd}.log"
        flush_size => 500
#        compression => "snappy"
        idle_flush_time => 10
        retry_interval => 0.5
       }
   }
  stdout { codec => rubydebug }
}

五:启动logstash

#/etc/init.d/logstash start

已经可以成功写入了。

时间: 2024-08-08 18:17:10

logstash将Kafka中的日志数据订阅到HDFS的相关文章

flume学习(三):flume将log4j日志数据写入到hdfs(转)

原文链接:flume学习(三):flume将log4j日志数据写入到hdfs 在第一篇文章中我们是将log4j的日志输出到了agent的日志文件当中.配置文件如下: [plain] view plaincopy tier1.sources=source1 tier1.channels=channel1 tier1.sinks=sink1 tier1.sources.source1.type=avro tier1.sources.source1.bind=0.0.0.0 tier1.sources

logstash通过kafka传输nginx日志(三)

单个进程 logstash 可以实现对数据的读取.解析和输出处理.但是在生产环境中,从每台应用服务器运行 logstash 进程并将数据直接发送到 Elasticsearch 里,显然不是第一选择:第一,过多的客户端连接对 Elasticsearch 是一种额外的压力:第二,网络抖动会影响到 logstash 进程,进而影响生产应用:第三,运维人员未必愿意在生产服务器上部署 Java,或者让 logstash 跟业务代码争夺 Java 资源. 所以,在实际运用中,logstash 进程会被分为两

Poseidon 系统是一个日志搜索平台——认证看链接ppt,本质是索引的倒排列表和原始日志数据都存在HDFS,而文档和倒排的元数据都在NOSQL里,同时针对单个filed都使用了独立索引,使用MR来索引和搜索

Poseidon 系统是一个日志搜索平台,可以在百万亿条.100PB 大小的日志数据中快速分析和检索.360 公司是一个安全公司,在追踪 APT(高级持续威胁)事件,经常需要在海量的历史日志数据中检索某些信息,例如某个恶意样本在某个时间段内的活动情况.在 Poseidon 系统出现之前,都是写 Map/Reduce 计算任务在 Hadoop 集群中做计算,一次任务所需的计算时间从数小时到数天不等,大大制约了 APT 事件的追踪效率.Poseidon 系统就是解决这个需求,能在数百万亿条规模的数据

安装sqoop,并将Mysql中的表数据导出到HDFS下的文本文件

首先是安装mysql数据库.使用 sudo apt-get install mysql-server命令即可安装完成.然后进行表的创建和插入数据.如图. 然后下载sqoop和连接mysql数据库的jar包.接下来是安装sqoop.首先是配置sqoop-env.sh文件.如图. 然后将config-sqoop文件中不需要检查的注释掉.如图. 然后接下来是将sqoop-1.4.4.jar包和连接mysql的jar包copy到hadoop目录下的lib目录中,同时把hadoop-core-1.2.1.

ELK+zookeeper+kafka收集springcould日志配置文档

############ 本文介绍使用ELK(elasticsearch.logstash.kibana) + kafka来搭建一个日志系统.主要演示使用spring aop进行日志收集,然后通过kafka将日志发送给logstash,logstash再将日志写入elasticsearch,这样elasticsearch就有了日志数据了,最后,则使用kibana将存放在elasticsearch中的日志数据显示出来,并且可以做实时的数据图表分析等等. 最开始我些项目的时候,都习惯用log4j来把

带你看懂大数据采集引擎之Flume&采集目录中的日志

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一.Flume的介绍: Flume由Cloudera公司开发,是一种提供高可用.高可靠.分布式海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于采集数据:同时,flume提供对数据进行简单处理,并写到各种数据接收方的能力,如果能用一句话概括Flume,那么Flume是实时采集日志的数据采集引擎. 二.Flume的体

Elastic Stack实战学习教程~日志数据的收集、分析与可视化

Elastic Stack介绍 近几年,互联网生成数据的速度不断递增,为了便于用户能够更快更精准的找到想要的内容,站内搜索或应用内搜索成了不可缺少了的功能之一.同时,企业积累的数据也再不断递增,对海量数据分析处理.可视化的需求也越来越高. 在这个领域里,开源项目ElasticSearch赢得了市场的关注,比如,去年Elastic公司与阿里云达成合作伙伴关系提供阿里云 Elasticsearch 的云服务.今年10月Elastic公司上市,今年11月举行了Elastic 中国开发者大会.目前各大云

游戏服务器中的日志处理方式之一

在游戏开发的过程中,我们需要记录一些日志,以便以后了解游戏运行的情况,以及根据日志发现并处理游戏中的突发情况. 一,游戏日志可以分为以下几种:1)系统日志2)用户操作日志3)异常日志,即错误日志 系统日志 系统日志一般描述的是服务器日常运行的状态.比如启动是否成功,每天统计一下内存的占用量,CPU的使用量等信息.用于查检服务器运行的健康状况.这对于技术分析来说是非常重要的.如果没有这些信息,一但服务器宕机,我们就两眼一抺黑,不知从何下手了.这部分日志一般产生的文件不大,内容不是太多,可以记录成文

Linux中的日志分析及管理

日志文件对于诊断和解决系统中的问题很有帮助,因为在Linux系统中运行的程序通常会把系统消息和错误消息写入相应的日志文件,这样系统一旦出现问题就会"有据可查".此外,当主机遭受攻击时,日志文件还可以帮助寻找攻击者留下的痕迹.一.主要日志文件在Linux系统中,日志数据主要包括以下三种类型:[内核及系统日志][用户日志][程序日志]Linux系统本身和大部分服务器程序的日志文件默认情况下都放置在目录"/var/log"中.一部分程序公用一个日志文件,一部分程序使用单个