flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统

一、Hadoop配置安装

注意:apache提供的hadoop-2.6.0的安装包是在32位操作系统编译的,因为hadoop依赖一些C++的本地库,

所以如果在64位的操作上安装hadoop-2.4.1就需要重新在64操作系统上重新编译

1.修改Linux主机名

2.修改IP

3.修改主机名和IP的映射关系

######注意######如果你们公司是租用的服务器或是使用的云主机(如华为用主机、阿里云主机等)

/etc/hosts里面要配置的是内网IP地址和主机名的映射关系

4.关闭防火墙

5.ssh免登陆

6.安装JDK,配置环境变量等

集群规划:

主机名IP安装的软件              运行的进程

hadoop01192.168.1.201jdk、hadoopNameNode、DFSZKFailoverController

(zkfc)

hadoop02192.168.1.202jdk、hadoopNameNode、DFSZKFailoverController

(zkfc)

hadoop03192.168.1.203jdk、hadoopResourceManager

hadoop04192.168.1.204jdk、hadoopResourceManager

hadoop05192.168.1.205jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、

QuorumPeerMain

hadoop06192.168.1.206jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、

QuorumPeerMain

hadoop07192.168.1.207jdk、hadoop、zookeeperDataNode、NodeManager、JournalNode、

QuorumPeerMain

说明:

1.在hadoop2.0中通常由两个NameNode组成,一个处于active状态,另一个处于standby状态。Active NameNode对外提供服务,而Standby

NameNode则不对外提供服务,仅同步active namenode的状态,以便能够在它失败时快速进行切换。

hadoop2.0官方提供了两种HDFS HA的解决方案,一种是NFS,另一种是QJM。

这里我们使用简单的QJM。在该方案中,主备NameNode之间通过一组JournalNode同步元数据信息,一条数据只要成功写入多数JournalNode即认为写入成

功。通常配置奇数个JournalNode

这里还配置了一个zookeeper集群,用于ZKFC(DFSZKFailoverController)故障转移,当Active NameNode挂掉了,会自动切换Standby

NameNode为standby状态

2.hadoop-2.2.0中依然存在一个问题,就是ResourceManager只有一个,存在单点故障,hadoop-2.4.1解决了这个问题,有两个

ResourceManager,一个是Active,一个是Standby,状态由zookeeper进行协调

安装步骤:

1.安装配置zooekeeper集群(在hadoop05上)

1.1解压

tar -zxvf zookeeper-3.4.5.tar.gz -C /hadoop/

1.2修改配置

cd /hadoop/zookeeper-3.4.5/conf/

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg

修改:dataDir=/hadoop/zookeeper-3.4.5/tmp

在最后添加:

server.1=hadoop05:2888:3888

server.2=hadoop06:2888:3888

server.3=hadoop07:2888:3888

保存退出

然后创建一个tmp文件夹

mkdir /hadoop/zookeeper-3.4.5/tmp

再创建一个空文件

touch /hadoop/zookeeper-3.4.5/tmp/myid

最后向该文件写入ID

echo 1 > /hadoop/zookeeper-3.4.5/tmp/myid

1.3将配置好的zookeeper拷贝到其他节点(首先分别在hadoop06、hadoop07根目录下创建一个hadoop目录:mkdir /hadoop)

scp -r /hadoop/zookeeper-3.4.5/ hadoop06:/hadoop/

scp -r /hadoop/zookeeper-3.4.5/ hadoop07:/hadoop/

注意:修改hadoop06、hadoop07对应/hadoop/zookeeper-3.4.5/tmp/myid内容

hadoop06:

echo 2 > /hadoop/zookeeper-3.4.5/tmp/myid

hadoop07:

echo 3 > /hadoop/zookeeper-3.4.5/tmp/myid

2.安装配置hadoop集群(在hadoop01上操作)

2.1解压

tar -zxvf hadoop-2.4.1.tar.gz -C /hadoop/

2.2配置HDFS(hadoop2.0所有的配置文件都在$HADOOP_HOME/etc/hadoop目录下)

#将hadoop添加到环境变量中

vim /etc/profile

export JAVA_HOME=/usr/java/jdk1.7.0_55

export HADOOP_HOME=/hadoop/hadoop-2.4.1

export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin

#hadoop2.0的配置文件全部在$HADOOP_HOME/etc/hadoop下

cd /hadoop/hadoop-2.6.0/etc/hadoop

2.2.1修改hadoop-env.sh

export JAVA_HOME=/usr/java/jdk1.7.0_55

2.2.2修改core-site.xml

<configuration>
<!-- 指定hdfs的nameservice为wxm -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://wxm</value>
</property>
<!-- 指定hadoop临时目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/root/wxm/software/hadoop/hadoop-2.6.5/tmp</value>
</property>
<!-- 指定zookeeper地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
</configuration>

2.2.3修改hdfs-site.xml

<configuration>
<!--指定hdfs的nameservice为wxm,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>wxm</value>
</property>
<!-- ns1下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.wxm</name>
<value>hadoop05,hadoop06</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.wxm.hadoop05</name>
<value>hadoop05:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.wxm.hadoop051</name>
<value>hadoop05:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.wxm.hadoop06</name>
<value>hadoop06:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.wxm.hadoop06</name>
<value>hadoop06:50070</value>
</property>
<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://hadoop05:8485;hadoop06:8485/wxm</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/root/wxm/software/hadoop/hadoop-2.6.5/journal</value>
</property>
<!-- 开启NameNode失败自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失败自动切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.wxm</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>
<!-- 使用sshfence隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<!-- 配置sshfence隔离机制超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>300000</value>
</property>
</configuration>

2.2.4修改mapred-site.xml

<configuration>
<!-- 指定mr框架为yarn方式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

2.2.5修改yarn-site.xml

<configuration>
<!-- 开启RM高可靠 -->
<property>
   <name>yarn.resourcemanager.ha.enabled</name>
   <value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
   <name>yarn.resourcemanager.cluster-id</name>
   <value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
   <name>yarn.resourcemanager.ha.rm-ids</name>
   <value>resourcemanagerAtWorker1,resourcemanagerAtWorker1</value>
</property>
<!-- 分别指定RM的地址 -->
<property>
   <name>yarn.resourcemanager.hostname.resourcemanagerAtWorker1</name>
   <value>worker1</value>
</property>
<property>
   <name>yarn.resourcemanager.hostname.resourcemanagerAtWorker2</name>
   <value>worker2</value>
</property>
<!-- 指定zk集群地址 -->
<property>
   <name>yarn.resourcemanager.zk-address</name>
   <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
<property>
   <name>yarn.nodemanager.aux-services</name>
   <value>mapreduce_shuffle</value>
</property>
</configuration>

2.2.6修改slaves(slaves是指定子节点的位置,因为要在hadoop01上启动HDFS、在hadoop03启动yarn,所以hadoop01上的

slaves文件指定的是datanode的位置,hadoop03上的slaves文件指定的是nodemanager的位置)

hadoop05

hadoop06

hadoop07

2.2.7配置免密码登陆

#首先要配置hadoop01到hadoop02、hadoop03、hadoop04、hadoop05、hadoop06、hadoop07的免密码登

#在hadoop01上生产一对钥匙

ssh-keygen -t rsa

#将公钥拷贝到其他节点,包括自己

ssh-coyp-id hadoop01

ssh-coyp-id hadoop02

ssh-coyp-id hadoop03

ssh-coyp-id hadoop04

ssh-coyp-id hadoop05

ssh-coyp-id hadoop06

ssh-coyp-id hadoop07

#配置hadoop03到hadoop04、hadoop05、hadoop06、hadoop07的免密码登陆

#在hadoop03上生产一对钥匙

ssh-keygen -t rsa

#将公钥拷贝到其他节点

ssh-coyp-id hadoop04

ssh-coyp-id hadoop05

ssh-coyp-id hadoop06

ssh-coyp-id hadoop07

#注意:两个namenode之间要配置ssh免密码登陆,别忘了配置hadoop02到hadoop01的免登陆

在hadoop02上生产一对钥匙

ssh-keygen -t rsa

ssh-coyp-id -i hadoop01

2.4将配置好的hadoop拷贝到其他节点

scp -r /hadoop/ hadoop02:/

scp -r /hadoop/ hadoop03:/

scp -r /hadoop/hadoop-2.4.1/ [email protected]:/hadoop/

scp -r /hadoop/hadoop-2.4.1/ [email protected]:/hadoop/

scp -r /hadoop/hadoop-2.4.1/ [email protected]:/hadoop/

scp -r /hadoop/hadoop-2.4.1/ [email protected]:/hadoop/

###注意:严格按照下面的步骤

2.5启动zookeeper集群(分别在hadoop05、hadoop06、hadoop07上启动zk)

cd /hadoop/zookeeper-3.4.5/bin/

./zkServer.sh start

#查看状态:一个leader,两个follower

./zkServer.sh status

2.6启动journalnode(分别在在hadoop05、hadoop06、tcast07上执行)

cd /hadoop/hadoop-2.4.1

sbin/hadoop-daemon.sh start journalnode

#运行jps命令检验,hadoop05、hadoop06、hadoop07上多了JournalNode进程

2.7格式化HDFS

#在hadoop01上执行命令:

hdfs namenode -format

#格式化后会在根据core-site.xml中的hadoop.tmp.dir配置生成个文件,这里我配置的是/hadoop/hadoop-2.4.1/tmp,然后

将/hadoop/hadoop-2.4.1/tmp拷贝到hadoop02的/hadoop/hadoop-2.4.1/下。

scp -r tmp/ hadoop02:/hadoop/hadoop-2.4.1/

2.8格式化ZK(在hadoop01上执行即可)

hdfs zkfc -formatZK

2.9启动HDFS(在hadoop01上执行)

sbin/start-dfs.sh

2.11启动zkfc

hadoop-daemon.sh start zkfc

2.10启动YARN(#####注意#####:是在hadoop03上执行start-yarn.sh,把namenode和resourcemanager分开是因为性能问题,因为

他们都要占用大量资源,所以把他们分开了,

他们分开了就要分别在不同的机器上启动)

sbin/start-yarn.sh

到此,hadoop-2.4.1配置完毕,可以统计浏览器访问:

http://192.168.1.201:50070

NameNode ‘hadoop01:9000‘ (active)

http://192.168.1.202:50070

NameNode ‘hadoop02:9000‘ (standby)

验证HDFS HA

首先向hdfs上传一个文件

hadoop fs -put /etc/profile /profile

hadoop fs -ls /

然后再kill掉active的NameNode

kill -9 <pid of NN>

通过浏览器访问:http://192.168.1.202:50070

NameNode ‘hadoop02:9000‘ (active)

这个时候hadoop02上的NameNode变成了active

在执行命令:

hadoop fs -ls /

-rw-r--r--   3 root supergroup       1926 2014-02-06 15:36 /profile

刚才上传的文件依然存在!!!

手动启动那个挂掉的NameNode

sbin/hadoop-daemon.sh start namenode

通过浏览器访问:http://192.168.1.201:50070

NameNode ‘hadoop01:9000‘ (standby)

验证YARN:

运行一下hadoop提供的demo中的WordCount程序:

hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.4.1.jar wordcount /profile /out

OK,大功告成!!!

二、spark安装

standalone模式搭建比较简单  参考官网即可

三、kafka集群搭建

1.  kafka2.11下载并解压

2.  修改配置文件

· config/server.properties
        broker.id=4(集群里的id不能重复,我是取每台机器IP最后一位)
        listeners=PLAINTEXT://192.168.248.134:9092(格式不变,绑定本机IP)
        log.dirs=/home/hadoop/kafka/logs4kafka(日志路径)
        zookeeper.connect=h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181
最后一项时延可以增大一些,曾经因为超过时延而报错,修改成10倍就OK了
     ·producer.properties
        bootstrap.servers=h2:9092,h3:9092,h4:9092,h8:9092,h9:9092,h10:9092
     ·consumer.properties
        zookeeper.connect=h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181
        group不配的话就是默认,看需求
    3.拷贝到其他节点,注意修改listeners绑定的IP和broker.id

4 基本命令

#start
    bin/kafka-server-start.sh config/server.properties

#create a topic
    bin/kafka-topics.sh --create --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --replication-factor 5 --partition 5 --topic T20161021

#list all topics
    bin/kafka-topics.sh --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --list

#describe the detail of this topic
    bin/kafka-topics.sh --describe --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181  --topic test101

#write some Messages to the topic

bin/kafka-console-producer.sh --broker-list h2:9092 --topic T20161021

#recerive the message producer writed
    bin/kafka-console-consumer.sh --zookeeper h2:2181,h3:2181,h4:2181,h8:2181,h9:2181,h10:2181 --topic T20161021 --from-beginning

四、flume-ng 1.6安装配置

1.下载安装并配置好flume的运行环境

2.编写配置文件

# ------------------- 定义数据流----------------------
    agent.sources = kafkaSource
    agent.channels = memoryChannel
    agent.sinks = hdfsSink
    agent.sources.kafkaSource.channels = memoryChannel
    agent.sinks.hdfsSink.channel = memoryChannel

#-------- kafkaSource相关配置-----------------
    agent.sources.kafkaSource.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.kafkaSource.zookeeperConnect = h2:2181,h3:2181,h4:2181
    agent.sources.kafkaSource.topic = T20161031
    #agent.sources.kafkaSource.groupId = flume
    agent.sources.kafkaSource.kafka.consumer.timeout.ms = 1000

#------- memoryChannel相关配置-------------------------
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity=10000
    agent.channels.memoryChannel.transactionCapacity=1000

#---------hdfsSink 相关配置------------------
    agent.sinks.hdfsSink.type = hdfs
    agent.sinks.hdfsSink.hdfs.path = hdfs://h4:9000/user/test/kafka2HdfsByFlume
    agent.sinks.hdfsSink.hdfs.writeFormat = Text
    agent.sinks.hdfsSink.hdfs.fileType = DataStream

3.启动Flume
    bin/flume-ng agent --conf conf --conf-file conf/kafka2HdfsByFlume.conf --name agent -Dflume.root.logger=INFO,console

五、sparkStreaming1.6.2

   
import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.*;import scala.Tuple2;import java.util.Arrays;public class StreamingOnHDFS {public static void main(String[] args){final SparkConf conf = newSparkConf().setMaster("spark://h4:7077").setAppName("SparkStreamingOnHDFS") ;Durations.seconds(5);//Durations.seconds(5) 设置每隔 5 秒final String checkpointDirectory = "hdfs://h4:9000/library/SparkStreaming/Checkpoint_Data";JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {@Overridepublic JavaStreamingContext create() {return createContext(checkpointDirectory, conf) ;}} ;JavaStreamingContext jsc = JavaStreamingContext. getOrCreate(checkpointDirectory, factory) ;JavaDStream lines = jsc.textFileStream("hdfs://h4:9000/user/test/kafka2HdfsByFlume") ;
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {public Iterable<String> call(String line) throws Exception {return Arrays. asList(line.split(" ")) ;}}) ;JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String,Integer>() {public Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word, 1) ;}}) ;JavaPairDStream<String, Integer> wordscount = pairs.reduceByKey(new Function2<Integer, Integer,Integer>() {public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2;}}) ;wordscount.print();jsc.start() ;jsc.awaitTermination() ;jsc.close() ;}
private static JavaStreamingContext createContext(String checkpointDirectory, SparkConf conf){System. out.println("==========Creating new context==============") ;JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations. seconds(5)) ;ssc.checkpoint(checkpointDirectory) ;return ssc;}}

六、测试

1.将程序打成runnable jar并上传

2.启动zookeeper、hadoop、spark、kafka、flume

3.进入spark安装目录下执行

bin/spark-submit --class com.unisk.spark.sparkStreaming.StreamingOnHDFS --master spark://h4:7077 /home/hadoop/spark/Apps4Spark/StreamingOnHDFS.jar

七、扩展

数据导入HDFS之后即可进行基于MR/spark的批处理或准实时的流式处理,可作海量数据的清洗工作或预处理,结果可存入Hbase、redis或RDBMS以做进一步的展示或挖掘

时间: 2024-08-09 10:35:40

flume + kafka + sparkStreaming + HDFS 构建实时日志分析系统的相关文章

构建AWStats日志分析系统

需求描述 管理员搭建完服务器,要对网站的性能做后期的不断的分析和调整,以至达到最完美的状态.针对服务器每天的日志访问量.高峰时间.压力等等是通过日志信息系统分析.如果事前没有预估,没有给定足相应的cpu.内存.假如有一天突然高发值,服务器会直接崩溃.通过观察日志,以便在以后工作调整中提出整改方案. 简介 在httpd服务器的访问日志文件access_log中,记录了大量的客户机访问信息,通过分析这些信息,可以及时了解Web站点的访问情况,通过AWStats日志分析系统,以完成自动化的日志分析与统

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

使用GoAccess构建简单实时日志分析系统

很早就知道Nginx日志分析工具GoAccess,但之前由于只能静态分析,感觉不太强大.最近发现它能够实时显示报表而且报表也比之前强大很多能做趋势分析.因此果断下载安装.以下是基于CentOS的安装配置步骤. 1.安装 GeoIPsudo yum install geoip geoip-devel 2.安装centos: yum install go access或源码安装$ wget http://tar.goaccess.io/goaccess-1.1.1.tar.gz $ tar -xzv

centos7搭建ELK开源实时日志分析系统

Elasticsearch 是个开源分布式搜索引擎它的特点有分布式零配置自动发现索引自动分片索引副本机制 restful 风格接口多数据源自动搜索负载等. Logstash 是一个完全开源的工具他可以对你的日志进行收集.分析并将其存储供以后使用如搜索. kibana 也是一个开源和免费的工具他 Kibana 可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面可以帮助您汇总.分析和搜索重要数据日志. 日志从客户端到服务端处理后在传递给客户的数据流流向如下

实时日志分析系统ELK 部署与运行中的问题汇总

前记: 去年测试了ELK,今年测试了Storm,最终因为Storm需要过多开发介入而放弃,选择了ELK.感谢互联网上各路大神,目前总算是正常运行了. logstash+elasticsearch+kibana的搭建参考:http://wsgzao.github.io/post/elk/.由于搭建过程比较简单就不赘述,主要分享几个坑. 正文: 1.日志如何获取 无论是storm方案还是elk,都涉及这个关键问题.为减少和运维.开发的交叉,尽可能独立.快速,加之当时发现了justniffer这个“神

[转载] 利用flume+kafka+storm+mysql构建大数据实时系统

原文: http://mp.weixin.qq.com/s?__biz=MjM5NzAyNTE0Ng==&mid=205526269&idx=1&sn=6300502dad3e41a36f9bde8e0ba2284d&key=c468684b929d2be22eb8e183b6f92c75565b8179a9a179662ceb350cf82755209a424771bbc05810db9b7203a62c7a26&ascene=0&uin=Mjk1ODMy

CentOS7下Elastic Stack 5.0日志分析系统搭建

一.概述 Elasticsearch是个开源分布式搜索引擎,它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等. Logstash是一个开源的用于收集,分析和存储日志的工具. Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以汇总.分析和搜索重要数据日志. Beats是elasticsearch公司开源的一款采集系统监控数据的代理ag

ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://tchuairen.blog.51cto.com/3848118/1861167 什么要做日志分析平台? 随着业务量的增长,每天业务服务器将会产生上亿条的日志,单个日志文件达几个GB,这时我们发现用Linux自带工具,cat grep awk 分析越来越力不从心了,而且除了服务器日志,还有程序报错日志,分布在不同的服务器,查阅繁琐. 待解决的痛点: 1.大量不同种类的日志成为了运

实时海量日志分析系统的架构设计、实现以及思考

1 序 对ETL系统中数据转换和存储操作的相关日志进行记录以及实时分析有助于我们更好的观察和监控ETL系统的相关指标(如单位时间某些操作的处理时间),发现系统中出现的缺陷和性能瓶颈. 由于需要对日志进行实时分析,所以Storm是我们想到的首个框架.Storm是一个分布式实时计算系统,它可以很好的处理流式数据.利用storm我们几乎可以直接实现一个日志分析系统,但是将日志分析系统进行模块化设计可以收到更好的效果.模块化的设计至少有两方面的优点: 模块化设计可以使功能更加清晰.整个日志分析系统可以分