[转载] 利用flume+kafka+storm+mysql构建大数据实时系统

原文: http://mp.weixin.qq.com/s?__biz=MjM5NzAyNTE0Ng==&mid=205526269&idx=1&sn=6300502dad3e41a36f9bde8e0ba2284d&key=c468684b929d2be22eb8e183b6f92c75565b8179a9a179662ceb350cf82755209a424771bbc05810db9b7203a62c7a26&ascene=0&uin=Mjk1ODMyNTYyMg%3D%3D&devicetype=iMac+MacBookPro9%2C2+OSX+OSX+10.10.3+build(14D136)&version=11000003&pass_ticket=HKR%2BXKPFBrbVIWepmb7SozvfYdm5CIHU8HWlVnE78YKUsYHCq65XPAv9e1W48Ts1

虽然我一直不赞成完全用开源软件组装成一个系统, 但是对于初创公司来说, 效率高并且成本小, 还是有潜在的应用空间的. 风险就是系统的维护.

本文介绍了如何使用flume+kafka+storm+mysql构建一个分布式大数据流式架构,涉及基本架构,安装部署等方面的介绍。

架构图

数据流向图

(是visio画的,图太大,放上来字看起来比较小,如果有需要的朋友留邮箱)

实时日志分析系统架构简介

系统主要分为四部分:

1).数据采集

负责从各节点上实时采集数据,选用cloudera的flume来实现

2).数据接入

由于采集数据的速度和数据处理的速度不一定同步,因此添加一个消息中间件来作为缓冲,选用apache的kafka

3).流式计算

对采集到的数据进行实时分析,选用apache的storm

4).数据输出

对分析后的结果持久化,暂定用mysql

详细介绍各个组件及安装配置:

操作系统:centos6.4

Flume

Flume是Cloudera提供的一个分布式、可靠、和高可用的海量日志采集、聚合和传输的日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。

下图为flume典型的体系结构:

Flume数据源以及输出方式:

Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力,在我们的系统中目前使用exec方式进行日志采集。

Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。在我们系统中由kafka来接收。

Flume版本:1.4.0

Flume下载及文档:

http://flume.apache.org/

Flume安装:

$tar zxvf apache-flume-1.4.0-bin.tar.gz /usr/local

Flume启动命令:

$bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console

注意事项:需要更改conf目录下的配置文件,并且添加jar包到lib目录下。

Kafka

Kafka是一个消息中间件,它的特点是:

1、关注大吞吐量,而不是别的特性

2、针对实时性场景

3、关于消息被处理的状态是在consumer端维护,而不是由kafka server端维护。

4、分布式,producer、broker和consumer都分布于多台机器上。

下图为kafka的架构图:

Kafka版本:0.8.0

Kafka下载及文档:http://kafka.apache.org/

Kafka安装:

> tar xzf kafka-<VERSION>.tgz

> cd kafka-<VERSION>

> ./sbt update

> ./sbt package

> ./sbt assembly-package-dependency Kafka

启动及测试命令:

(1) start server

> bin/zookeeper-server-start.sh config/zookeeper.properties

> bin/kafka-server-start.sh config/server.properties

(2)Create a topic
> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test

> bin/kafka-list-topic.sh --zookeeper localhost:2181

(3)Send some messages

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

(4)Start a consumer

> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

storm

Storm是一个分布式的、高容错的实时计算系统。

Storm架构图:

storm工作任务topology:

Storm 版本:0.9.0

Storm 下载:http://storm-project.net/

Storm安装:

第一步,安装Python2.7.2

# wget http://www.python.org/ftp/python/2.7.2/Python-2.7.2.tgz

# tar zxvf Python-2.7.2.tgz

# cd Python-2.7.2

# ./configure

# make

# make install

# vi /etc/ld.so.conf

第二步,安装zookeeper(kafka自带zookeeper,如果选用kafka的,该步可省略)

#wget http://ftp.meisei-u.ac.jp/mirror/apache/dist//zookeeper/zookeeper-3.3.3/zoo keeper-3.3.3.tar.gz

# tar zxf zookeeper-3.3.3.tar.gz

# ln -s /usr/local/zookeeper-3.3.3/ /usr/local/zookeeper

# vi ~./bashrc (设置ZOOKEEPER_HOME和ZOOKEEPER_HOME/bin)

第三步,安装JAVA

$tar zxvf jdk-7u45-linux-x64.tar.gz /usr/local

如果使用storm0.9以下版本需要安装zeromq及jzmq。

第四步,安装zeromq以及jzmq

jzmq的安装貌似是依赖zeromq的,所以应该先装zeromq,再装jzmq。

1)安装zeromq(非必须):

  • # wget http://download.zeromq.org/historic/zeromq-2.1.7.tar.gz
  • # tar zxf zeromq-2.1.7.tar.gz
  • # cd zeromq-2.1.7
  • # ./configure
  • # make
  • # make install
  • # sudo ldconfig (更新LD_LIBRARY_PATH)

缺少c++环境:yum install gcc-c++

之后遇到的问题是:Error:cannot link with -luuid, install uuid-dev

这是因为没有安装uuid相关的package。

解决方法是:# yum install uuid*

# yum install e2fsprogs*

# yum install libuuid*

2)安装jzmq(非必须)

  • # yum install git
  • # git clone git://github.com/nathanmarz/jzmq.git
  • # cd jzmq
  • # ./autogen.sh
  • # ./configure
  • # make
  • # make install

然后,jzmq就装好了,这里有个网站上参考到的问题没有遇见,遇见的童鞋可以参考下。在./autogen.sh这步如果报错:autogen.sh:error:could not find libtool is required to run autogen.sh,这是因为缺少了libtool,可以用#yum install libtool*来解决。

如果安装的是storm0.9及以上版本不需要安装zeromq和jzmq,但是需要修改storm.yaml来指定消息传输为netty:

storm.local.dir: "/tmp/storm/data"

storm.messaging.transport: "backtype.storm.messaging.netty.Context"
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100

第五步,安装storm

$unzip storm-0.9.0-wip16.zip

备注:单机版不需要修改配置文件,分布式在修改配置文件时要注意:冒号后必须加空格。

测试storm是否安装成功:

1. 下载strom starter的代码 git clone https://github.com/nathanmarz/storm-starter.git

2. 使用mvn -f m2-pom.xml package 进行编译

如果没有安装过maven,参见如下步骤安装:
1.从maven的官网下载http://maven.apache.org/

tar zxvf apache-maven-3.1.1-bin.tar.gz /usr/local

配置maven环境变量

export MAVEN_HOME=/usr/local/maven

export PATH=$PATH:$MAVEN_HOME/bin

验证maven是否安装成功:mvn -v

修改Storm-Starter的pom文件m2-pom.xml ,修改dependency中twitter4j-core 和 twitter4j-stream两个包的依赖版本,如下:

org.twitter4j
twitter4j-core
[2.2,)

org.twitter4j
twitter4j-stream
[2.2,)

编译完后生成target文件夹

启动zookeeper

zkServer.sh start

启动nimbus supervisor ui

storm nimbus

storm supervisor

storm ui

jps查看启动状态

进入target目录执行:

storm jar storm-starter-0.0.1-SNAPSHOT-jar-with-dependencies.jar storm.starter.WordCountTopology wordcountTop

然后查看http://localhost:8080

注释:单机版 不用修改storm.yaml

kafka和storm整合

1.下载kafka-storm0.8插件:https://github.com/wurstmeister/storm-kafka-0.8-plus

2.该项目下载下来需要调试下,找到依赖jar包。然后重新打包,作为我们的storm项目的jar包。

3.将该jar包及kafka_2.9.2-0.8.0-beta1.jar metrics-core-2.2.0.jar scala-library-2.9.2.jar (这三个jar包在kafka-storm-0.8-plus项目依赖中能找到)

备注:如果开发的项目需要其他jar,记得也要放进storm的Lib中比如用到了mysql就要添加mysql-connector-java-5.1.22-bin.jar到storm的lib下

flume和kafka整合

1.下载flume-kafka-plus: https://github.com/beyondj2ee/flumeng-kafka-plugin

2.提取插件中的flume-conf.properties文件

修改该文件:#source section

producer.sources.s.type = exec
producer.sources.s.command = tail -f -n+1 /mnt/hgfs/vmshare/test.log
producer.sources.s.channels = c

修改所有topic的值改为test

将改后的配置文件放进flume/conf目录下

在该项目中提取以下jar包放入环境中flume的lib下:

以上为单机版的flume+kafka+storm的配置安装

flume+storm插件

https://github.com/xiaochawan/edw-Storm-Flume-Connectors

启动步骤

安装好storm,flume,kafka之后开始项目部署启动(在部署启动之前最好按照安装文档进行storm kafka flume各个组件测试)。

第一步
将编写好的storm项目打成jar包放入服务器上,假如放在/usr/local/project/storm.xx.jar

注:关于storm项目的编写见安装文档中的 kafka和storm整合 。

第二步

启动zookeeper(这里可以启动kafka自带的zookeeper或者启动单独安装的kafka,以下以kafka自带为例)

cd /usr/local/kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
第三步
启动kafka
cd /usr/local/kafka
> bin/kafka-server-start.sh config/server.properties
创建主题
> bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
注:因为kafka消息的offset是由zookeeper记录管理的,所以在此需指定zookeeper的ip,replica 表示该主题的消息被复制几份,partition 表示每份主题被分割成几部分。test表示主题名称。
第四步
启动storm
> storm nimbus
> storm supervisor
> storm ui
cd /usr/local/project/
> storm jar storm.xx.jar storm.testTopology test
注:storm.xx.jar 为我们编写好的storm项目jar包,第一步完成的工作。 storm.testTopology 为storm项目中main方法所在的类路径。test为此次topology的名字。
第五步
启动flume
cd /usr/local/flume
bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer 
注:flume.conf.properties为我们自定义的flume配置文件,flume安装好后是没有此文件的,需要我们自己编写,编写方式见flume安装的文章。

至此需要启动的程序已经全部启动,storm项目已经开始运行,可以打开storm ui 观察运行是否正常。
http://localhost:8080
注:此处ip为storm nimbus所在机器Ip 端口可在storm配置文件 storm/conf/storm.yaml中修改

本文摘自http://blog.csdn.net/mylittlered/article/details/20810265 标题为:“flume+kafka+storm+mysql架构设计” 。

时间: 2024-07-31 14:28:15

[转载] 利用flume+kafka+storm+mysql构建大数据实时系统的相关文章

利用flume+kafka+storm+mysql构建大数据实时系统

架构图 数据流向图 1.Flume 的一些核心概念: 2.数据流模型 Flume以agent为最小的独立运行单位.一个agent就是一个JVM.单agent由Source.Sink和Channel三大组件构成,如下图: Flume的数据流由事件(Event)贯穿始终.事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息,这些Event由Agent外部的Source,比如上图中的Web Server生成.当Source捕获事件后会进行特定的格式化,然后Source会把事件

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

flume+kafka+storm+mysql架构设计

前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考. 这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql (项目是maven项目,需要改动mysql配置,提供两种topology:读取本地文件(用来本地测试):读取服务器日志文件.) (是visio画的,图太大,放上来字看起来比较小,如果有需要的朋友留邮箱) 实时日志分析系统架构简介 系统主要分为四部分:                         负责从各节点上

从0到1构建大数据生态系列1:数据蛮荒中的拓荒之举

缘起 我们都知道,当前大数据的需求基本属于遍地开花.无论是帝都.魔都,还是广州.深圳,亦或是全国其他各地,都在搞大数据:不管是不到百人的微小公司,还是几百上千人的中型公司,亦或是上万的大型公司,都在需求数据岗位. 大公司暂且不论,他们一切都走在前头.那么,对于中小型企业来说,开始尝试以数据的思维去思考问题,开始涉足大数据领域,这就是一个从0到1的过程了. 有(bu)幸(xing),近半年来,我亲自见证以及亲身体会到了这个过程,或者至今仍然在完善1这个过程中.期间,有痛苦有坑.有喜悦有成功.有沉静

引跑科技助力传统企业构建大数据价值体系

2015年8月11日,在工信部信息化推进司指导下,由国家两化融合创新推进联盟主办的第二届<云计算应用创新推进大会>在北京西苑酒店成功召开.大会将以"推进产业互联.探索工业4.0发展道路"为主题,深度探讨企业"云化"过程面临的挑战与问题,寻求最佳技术解决方案和实施路线,推进"企业云"建设,实现企业信息化的全面升级. 本次会议上共计有超过10多家的国有大中型企业及IT服务和供应商分享在云计算.大数据领域的实践和应用成果. 国家两化联盟,全

mysql jdbc大数据

ps = (PreparedStatement) con.prepareStatement("select * from bigTable", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ps.setFetchSize(Integer.MIN_VALUE); ps.setFetchDirection(ResultSet.FETCH_REVERSE); mysql 处理大数据 原文地址:https://www.cnb

Flume+Kafka+Storm+Redis 大数据在线实时分析

1.实时处理框架 即从上面的架构中我们可以看出,其由下面的几部分构成: Flume集群 Kafka集群 Storm集群 从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同的集群系统之间打通(从上面的图示中也能很好地说明这一点),即需要做各个系统之前的整合,包括Flume与Kafka的整合,Kafka与Storm的整合.当然,各个环境是否使用集群,依个人的实际需要而定,在我们的环境中,Flume.Kafka.Storm都使用集群. 2. Flume+Kafka整合 2.1 整合思路

Flume+Kafka+Storm+Redis实时分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 今天作者要在这里通过一个简单的电商网站订单实时分析系统和大家一起梳理一下大数据环境下的实时分析系统的架构模型.当然这个架构模型只是实时分析技术的一 个简单的入门级架构,实际生产环境中的大数据实时分析技术还涉及到很多细节的处理, 比如使用Storm的ACK机制保证数据都能被正确处理, 集群的高可用架构, 消费数据时如何处理重复数据或者丢失数据等问题,根据不同的业务场景,对数据的可靠性要求以及系统的复杂度的要

新版flume+kafka+storm安装部署

安装步骤: 1.版本介绍: zookeeper3.4.6 flume-ng1.6 kafka2.10-0.8.2 storm0.9.5 2.安装zookeeper 1.下载最新release版zookeeper http://zookeeper.apache.org/releases.html#download 2.修改zookeeper配置文件 $zookeeper_home/conf $ cp zoo_sample.cfg zoo_sample.cfg.bak $ mv zoo_sample