Hadoop数据收集与入库系统Flume与Sqoop

Hadoop提供了一个中央化的存储系统,其有利于进行集中式的数据分析与数据共享。 Hadoop对存储格式没有要求。可以存储用户访问日志、产品信息以及网页数据等数据。

常见的两种数据来源。一种是分散的数据源:机器产生的数据、用户访问日志以及用户购买日志。另一种是传统系统中的数据:传统关系型数据库(MySQL、Oracle)、磁盘阵列以及磁带。

Flume由三部分构成。Master负责负责通信及配置管理,是集群的控制器。Collector用于对数据进行聚合。往往会产生一个更大的数据流。然后加载到HDFS上。Agent负责采集数据。其是Flume中产生数据流的地方,同时Agent会将产生的数据传输到Collector.

Agent 用于采集数据是数据流产生的地方。通常由source和sink两部分组成 。Source用于获取数据,可从文本文件,syslog,HTTP等获 取数据。 Sink将Source获得的数据进一步传输给后面的Collector。
Flume自带了很多source和sink实现。 syslogTcp(5140) | agentSink("localhost",35853) 意思是通过通信协议获取5140端口数据,然后发送到localhost主机的35853端口。 tail("/etc/services") | agentSink("localhost",35853) 意思是读取文件夹/etc/services下文件,然后发送到localhost主机的35853端口。

Collector 用于汇总多个Agent结果 ,将汇总结果导入后端存储系统,比如HDFS,HBase。 Flume自带了很多collector实现 collectorSource(35853) | console  表示将收集结果写到控制台。CollectorSource(35853)
| collectorSink("file:///tmp/flume/collected", "syslog") 表示将收集结果写到本地目录。collectorSource(35853) | collectorSink("hdfs://namenode/user/flume/ ","syslog"); 表示将收集结果写到hdfs目录。

一般为了防止一个collector挂掉所有agent都失效可以将不同的agent连接不同的collector。同时,这样可以保证不同数据类型的agent将数据放到同一个collector。

Master 负责管理协调 agent 和collector的配置信息,是Flume集群的控制器。其可跟踪数据流的最后确认信息,并通知agent。 通常需配置多个master以防止单点故障。借助zookeeper管理管理多Master。

构建基于Flume的数据收集系统 

Agent和Collector均可以动态配置。可通过命令行或Web界面配置。 命令行配置 : 在已经启动的master节点上,依次输入”flume shell””connect localhost ”    如执行 exec config a1 ‘tailDir(“/data/logfile”)’ ‘agentSink’ 。 Web界面 : 选中节点,填写source、sink等信息。

常用架构举例—拓扑1  

agentA : tail(“/ngnix/logs”) | agentSink("collector",35853);

agentB : tail(“/ngnix/logs”) | agentSink("collector",35853);

agentC : tail(“/ngnix/logs”) | agentSink("collector",35853);

agentD : tail(“/ngnix/logs”) | agentSink("collector",35853);

agentE : tail(“/ngnix/logs”) | agentSink("collector",35853);

agentF : tail(“/ngnix/logs”) | agentSink("collector",35853); collector : collectorSource(35853) | collectorSink("hdfs://namenode/flume/","srcdata");

agentA : src | agentE2ESink("collectorA",35853);

agentB : src | agentE2ESink("collectorA",35853);

agentC : src | agentE2ESink("collectorB",35853);

agentD : src | agentE2ESink("collectorB",35853);

agentE : src | agentE2ESink("collectorC",35853);

agentF : src | agentE2ESink("collectorC",35853);

collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");

collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");

collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");

agentA : src | agentE2EChain("collectorA:35853","collectorB:35853");

agentB : src | agentE2EChain("collectorA:35853","collectorC:35853");

agentC : src | agentE2EChain("collectorB:35853","collectorA:35853");

agentD : src | agentE2EChain("collectorB:35853","collectorC:35853");

agentE : src | agentE2EChain("collectorC:35853","collectorA:35853");

agentF : src | agentE2EChain("collectorC:35853","collectorB:35853");

collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");

collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");

collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");

传统数据库与Hadoop间数据同步

Sqoop:SQL-to-Hadoop,是连接传统关系型数据库和Hadoop 的桥梁。可以把关系型数据库的数据导入到 Hadoop 系统 ( 如 HDFS HBase 和 Hive) 中或把数据从 Hadoop 系统里抽取并导出到关系型数据库里/利用MapReduce可以加快数据传输速度实现批处理方式进行数据传输。

Sqoop优势是高效、可控地利用资源、任务并行度,超时时间等 、数据类型映射与转换 可自动进行或用户也可自定义 。同时,其 支持多种数据库如:MySQL 、Oracle 以及PostgreSQL 。

通过命令sqoop可以生成map task将传统数据库中的数据传输到HDFS、Hbase或Hive上。

Sqoop import 将数据从关系型数据库导入Hadoop中 

        步骤1:Sqoop与数据库Server通信,获取数据库表的元数据 信息; 步骤2:Sqoop启动一个MapOnly的MR作业,利用元数据信 息并行将数据写入Hadoop。

Sqoop import使用:

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities

--connnect: 指定JDBC URL

--username/password:mysql数据库的用户名

--table:要读取的数据库表

bin/hadoop fs -cat cities/part-m-*

1,USA,Palo Alto

2,Czech Republic,Brno

3,USA,Sunnyvale

导入到指定目录:

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--target-dir /etl/input/cities

导入字段country为USA的值:

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--where "country = ‘USA‘"

生成顺序文件:

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--as-sequencefile

指定map个数:

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--num-mappers 10

Sqoop import—导入多个表 :

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--query ‘SELECT normcities.id, \

countries.country, \

normcities.city \

FROM normcities \

JOIN countries USING(country_id) \

WHERE $CONDITIONS‘ \

--split-by id \

--target-dir cities

Sqoop import增量导入(一):

适用于数据每次被追加到数据库中,而已有数据不变的情况且仅导入id这一列值大于1的记录。

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table visits \

--incremental append \

--check-column id \

--last-value 1

Sqoop import增量导入(二)

每次成功运行后,sqoop将最后一条记录的id值保存到 metastore中,供下次使用。

sqoop job \

--create visits \

--import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table visits \

--incremental append \

--check-column id \

--last-value 0

运行sqoop作业:sqoop job --exec visits

Sqoop import增量导入(三)

数据库中有一列last_update_date,记录了上次修改时间。 Sqoop仅将某时刻后的数据导入Hadoop。

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table visits \

--incremental lastmodified \

--check-column last_update_date \

--last-value “2013-05-22 01:01:01”

Sqoop Export 将数据从Hadoop导入关系型数据库导中

步骤1:Sqoop与数据库Server通信,获取数据库表的元数据 信息; 步骤2:并行导入数据:将Hadoop上文件划分成若 干个split; 每个split由一个Map Task进 行数据导入。

Sqoop Export使用方法

sqoop export \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--export-dir cities

--connnect: 指定JDBC URL

--username/password:mysql数据库的用户名

--table:要导入的数据库表

export-dir:数据在HDFS上存放目录

Sqoop Export—保证原子性:

sqoop export \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--staging-table staging_cities

Sqoop Export—更新已有数据:

sqoop export \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--update-key id sqoop export \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--update-key id \

--update-mode allowinsert

Sqoop Export—选择性插入 :

sqoop export \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--columns country,city

Sqoop与其他系统结合

Sqoop可以与Oozie、Hive、Hbase等系统结合;  用户需要在sqoop-env.sh中增加HBASE_HOME、 HIVE_HOME等环境变量。

Sqoop与Hive结合:

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--hive-import

Sqoop与HBase结合:

sqoop import \

--connect jdbc:mysql://mysql.example.com/sqoop \

--username sqoop \

--password sqoop \

--table cities \

--hbase-table cities \

--column-family world

以上介绍了数据收集系统以及数据传输工具sqoop,后续将讲解Hive及Pig相关主题。

时间: 2024-10-11 23:01:55

Hadoop数据收集与入库系统Flume与Sqoop的相关文章

hadoop四----数据收集flume

Flume是一个分布式的.可靠的.可用的服务,用于从许多不同的源上有效地搜集.汇总.移动大量数据日志到一个集中式的数据存储中.并且它是一个简单的和灵活的基于流的数据流架构.它具有鲁棒性和容错机制以及故障转移和恢复的机制.对于分析的应用中它使用一个简单的可扩展的数据模型.Flume传输的数据可以是网络,媒体等产生. Apache Flume是Apache软件基金会的一个顶级项目. 源-Source,接收器-Sink,通道-Channel flume是cloudera公司的一款高性能.高可能的分布式

大数据技术之_18_大数据离线平台_02_Nginx+Mysql+数据收集+Web 工程 JS/JAVA SDK 讲解+Flume 故障后-如何手动上传 Nginx 日志文件至 HDFS 上

十一.Nginx11.1.介绍11.2.常见其他 Web 服务器11.3.版本11.4.Nginx 安装11.5.目录结构11.6.操作命令十二.Mysql12.1.介绍12.2.关系型数据库(SQL)种类12.3.特征12.4.术语12.4.与非关系型数据库比较(Not Only SQL)12.4.1.种类12.4.2.特征12.4.3.总结十三.数据收集13.1.收集方式13.2.数据的事件类型13.2.1.Launch 事件13.2.2.PageView 事件13.3.Nginx 日志收集

多功能表单填报系统V1.2.1-适用于在线报名系统、调查、数据收集等

多功能表单系统V1.2.1 前台:http://www.schoolms.net/mysoft/biaodan/index.asp 后台:http://www.schoolms.net/mysoft/biaodan/admin/index.asp 本系统适用于填报收集一些数据信息,例如在线报名系统.教师获奖登记.调查等等系统应用. 数据格式有文本框输入.单选.多选,3种模式.文件上传功能暂无法实现. 支持二级管理员权限,可以分别管理各个表单项目. 可以把数据导出为Excel 其中多选,是否必选,

数据集成:Flume和Sqoop

Flume和Sqoop是Hadoop数据集成和收集系统,两者的定位不一样,下面根据个人的经验与理解和大家做一个介绍: Flume由cloudera开发出来,有两大产品:Flume-og和Flume-ng,Flume-og的架构过于复杂,在寻问当中会有数据丢失,所以放弃了.现在我们使用的是Flume-ng,主要是日志采集,这个日志可以是TCP的系统的日志数据,可以是文件数据(就是通常我们在Intel服务器,通过其中的机构传过来的接口,或者通过防火墙采集过来的日志),在HDFS上去存储,可以和kaf

使用nginx lua实现网站统计中的数据收集

使用nginx lua实现网站统计中的数据收集 导读 网站数据统计分析工具是各网站站长和运营人员经常使用的一种工具,常用的有 谷歌分析.百度统计和腾讯分析等等.所有这些统计分析工具的第一步都是网站访问数据的收集.目前主流的数据收集方式基本都是基于javascript的.在此简要分析数据收集的原理,并按照步骤,带领大家一同搭建一个实际的数据收集系统. 数据收集原理分析 简单来说,网站统计分析工具需要收集到用户浏览目标网站的行为(如打开某网页.点击某按钮.将商品加入购物车等)及行为附加数据(如某下单

典型大数据计算模式与系统

典型大数据计算模式 典型系统 大数据查询分析计算 HBase,Hive,Cassandra,Impala,Shark,Hana等 批处理计算 Hadoop MapReduce,Spark等 流式计算 Scribe,Flume,Storm,S4, Spark Steaming等 迭代计算 HaLoop,iMapReduce,Twister,Spark等 图计算 Pregel,Giraph,Trinity,PowerGraph,GraphX等 内存计算 Dremel,Hana,Spark等

网站统计中的数据收集原理及实现

转载自:http://blog.sina.com.cn/s/blog_62b832910102w5mx.html Avinash Kaushik将点击流数据的获取方式分为4种:log files.web beacons.JavaScript tags和packet sniffers,其中包嗅探器(packet sniffers)比较不常见,最传统的获取方式是通过WEB日志文件(log files):而beacons和JavaScript是目前较为流行的方式,Google Analytics目前就

网站统计中的数据收集原理及实现(share)

转载自:http://blog.codinglabs.org/articles/how-web-analytics-data-collection-system-work.html 网站数据统计分析工具是网站站长和运营人员经常使用的一种工具,比较常用的有谷歌分析.百度统计和腾讯分析等等.所有这些统计分析工具的第一步都是网站访问数据的收集.目前主流的数据收集方式基本都是基于javascript的.本文将简要分析这种数据收集的原理,并一步一步实际搭建一个实际的数据收集系统. 数据收集原理分析 简单来

Hadoop Web项目--Friend Find系统

项目使用软件:Myeclipse10.0,JDK1.7,Hadoop2.6,MySQL5.6,EasyUI1.3.6,jQuery2.0,Spring4.1.3, Hibernate4.3.1,struts2.3.1,Tomcat7 ,Maven3.2.1. 项目下载地址:https://github.com/fansy1990/friend_find ,项目部署参考:http://blog.csdn.net/fansy1990/article/details/46481409 . Hadoop