Flume,Sqoop学习以及应用

目录

  • 1.Flume是什么?
  • 2.Flume如何搭建
  • 3.Flume应用
  • 4.Sqoop是什么?
  • 5.使用Sqoop将HBase数据计算并导入MySql

学习文档参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

1.Flume是什么?

Flume简单概括就是一个收集日志的工具,它可以通过调用接口,RPC,还有网页的一些操作进行日志的收集。它是一个分布式开源的Java编写的由Apache维护的项目。

2.Flume如何搭建

搭建前提条件

2.1下载并解压到指定目录

崇尚授人以渔的思想,我说给大家怎么下载就行了,就不直接放连接了,大家可以直接输入官网地址 http://flume.apache.org ,一般在官网的上方或者左边都会有Download按钮,这个在左侧,然后点进去下载想要的版本即可。 这个会有点慢,如果嫌弃的化,可以通过相关镜像网站进行下载,可以百度搜索软件镜像,就能搜到很多镜像网站,在里面就可以下载,如果你下载的东西属于Apache旗下的,可以看的有专门的一个Apache目录,里面存的都是Apache旗下相关产品。

可以先本地下载,然后通过ftp上传,也可以直接在服务器下载。

我这里下载好后,解压到了服务器/opt 目录下面,并修改了下目录名称为flume(你也可以不改,配置环境变量的时候按照实际情况来就行了。)

2.2在flume-env.sh里面配置Java路径

进入到 conf目录下面,对flume-env.sh进行编辑,将export JAVA_HOME修改为实际路径。

cd /opt/flume/conf/

vim flume-env.sh

export JAVA_HOME=/opt/java/jdk1.8.0_221

2.3添加Flume环境变量

环境变量存放的是软件的具体位置,运行程序命令会根据配置的变量找到软件并执行,否则会报错。(通过手动下载并上传到Linux服务器的都需要配置环境变量。)

vim /etc/profile

export FLUME_HOME=/opt/flume

export FLUME_CONF_DIR=/opt/flume/conf

PATH=$FLUME_HOME/bin

source /etc/profile

2.4通过flume-ng version验证是否配置成功

直接控制台运行 flume-ng version

显示Flume 1.6.0 就好了,如果显示了Error什么报错信息先不用管。

3.Flume应用

日志采集系统:

3.1配置nginx环境

请参考菜鸟教程: https://www.runoob.com/linux/nginx-install-setup.html

按照上述步骤安装完后,需要对nginx配置下访问日志格式:

编辑nginx.conf,默认安装路径在/etc/nginx下

cd /etc/nginx

vim nginx.conf

在http模块下面添加:

解析:(以^A为日志分隔符,remote_addr代表远程地址,msec代表访问时间,http_host代表访问主机名,request_uri代表访问资源)

log_format my_format ‘$remote_addr^A$msec^A$http_host^A$request_uri‘;

在server模块下面添加:

解析:(访问地址 域名/log.gif,请求格式是image,存放地址是/opt/data/access.log )

location =/log.gif { default_type image/gif; access_log /opt/data/access.log my_format; }

这样访问nginx的时候就会生成类似下面的内容:

192.168.40.1^A1577365502.563^Atuge1^A/log.gif?en=e_crt&oid=123456&on=%E6%B5%8B%E8%AF%95%E8%AE%A2%E5%8D%95123456&cua=524.01&cut=RMB&pt=alipay&ver=1&pl=website&sdk=js&u_ud=039F6588-ED65-4187-87CF-9DBBC9F19645&u_mid=zhangsan&u_sd=605DECAA-93C0-46B7-AC47-7B1898DBD6BC&c_time=1577365502881&l=zh-CN&b_iev=Mozilla%2F5.0%20(Windows%20NT%2010.0%3B%20WOW64)%20AppleWebKit%2F537.36%20(KHTML%2C%20like%20Gecko)%20Chrome%2F78.0.3904.97%20Safari%2F537.36&b_rst=1536*86

3.2编写触发事件传输代码

代码思路:

  • 前端通过触发事件,生成image格式数据发送。
  • 后端通过API接口直接发送Get请求。

代码连接: https://gitee.com/shuai7boy/BIG_DATA_LOG

3.3配置Flume环境

  • 下载

    官网下载,然后上传到服务器并进行解压。

  • 在flume配置文件中配置java环境变量

    找到flume-env.sh进行配置,什么?没有?没关系,将flume-env.sh.template重命名下就行了。

  • 配置flume环境变量

    vim /etc/profile

    export FLUME_HOME=/opt/flume export FLUME_CONF_DIR=/opt/flume/conf

    PATH=$FLUME_HOME/bin

  • 验证是否配置成功

    flume-ng version

    当弹出以下内容,则说明配置成功。

    Flume 1.6.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080 Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015 From source with checksum b29e416802ce9ece3269d34233baf43f

3.4运行Flume,将本地日志写入HDFS

编写flume代码,参考官方案例: http://flume.apache.org/releases/content/1.6.0/FlumeUserGuide.html

在安装Linux的服务器上,创建一个监控文件(姑且取名optionHdfs.conf):

# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/data/access.log --监控文件路径,新增加内容就会往hdfs里面写。

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://tuge2:9000/flume/webdata/%Y-%m-%d --填写active NameNode
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行flume,同样参考官方来就行:flume-ng agent --conf conf --conf-file optionHdfs.conf --name a1 -Dflume.root.logger=INFO,console

运行后就按照文件规则将日志里面的内容导入hdfs里了。

3.5编写ETL代码,将HDFS内容导入到HBase里面

代码思路:

  • 使用Map/Reduce将hdfs中的内容提取出来进行分割处理,然后Map到HBase里面。

代码连接:https://gitee.com/shuai7boy/BIG_DATA_ETL

3.6使用Map/Reduce将HBase数据分析处理后导入到MySql

数据和维度:

  • 用户某段时间活跃量
  • 用户某段时间基于某个浏览器的活跃量
  • 用户某段时间新增认数
  • 用户某段时间基于某个浏览器的新增人数

主要拿用户时间活跃量和用户某段时间基于某个浏览器的活跃量来讨论。

代码思路:

  • 定义来源:

    在Runner类里面定义来源为HBase。

  • 定义维度

    将基于用户活跃度,基于某平台的用户活跃度,新增用户数,基于某平台的新增用户数等维度进行设定类。

  • 进行Map:

    继承TableMapper

    根据查询的数据来源,映射成 维度+用户信息。

  • 进行Reduce:

    继承Reducer

    计算成 维度+用户去重。

  • 进行To MySql:

    继承OutputFormat重写getRecordWriter,checkOutputSpecs,getOutputCommitter

    继承RecordWriter重写write和close

    继承IDimensionConverter重写getDimensionIdByValue,executeSql

    在Runner里面定义写入MySql。SQL语句都定义在了配置文件里面,根据维度进行调用。

    首先判断各个维度是否存在,不存在先写入维度信息。 然后就是写入更新统计信息(每映射10条更新一次。)

    代码链接: https://gitee.com/shuai7boy/BIG_DATA_TOMYSQL

4.Sqoop是什么?

Sqoop是一款导入导出数据的工具,可以将MySql,Oracel等关系型数据据导入到HDFS,Hive,HBase里面。

官方文档: http://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html

5.使用Sqoop将HBase数据计算并导入MySql

5.0安装Sqoop

1.在Linux服务器对Sqoop进行解压

我这里下载的是1.4.6版本。**请注意下载的时候一定要安装sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 压缩包,不要下载 sqoop-1.4.6.tar.gz 压缩包,因为这个压缩包少东西。**

下载地址 http://archive.apache.org/dist/sqoop/1.4.6/

下载好后使用ftp工具将文件上传到Linux服务器上(我上传到了/opt/sqoop下面)

然后使用tar -xvf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz 进行解压。

2.修改配置文件名字

进入到conf文件夹下,

mv sqoop-env-template.sh sqoop-env.sh

3.配置环境变量

vim /etc/profile

添加:

export SQOOP_HOME=/opt/sqoop/sqoop-1.4.6

path=$SQOOP_HOME/bin

4.校验是否安装成功

sqoop --version

(如果出现异常,请根据异常修改bin下面的configure-sqoop)

5.1在Hive里面创建数据表进行存数据

HBase表结构:Row,Name(列族+限定符),timestamp,Value

5.1.0在hive中创建hbase的eventlog对应表,并进行hive表和hbase表关联

hive和hbase表关联官方文档: https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration

CREATE EXTERNAL TABLE event_logs(
key string, pl string, en string, s_time bigint, p_url string, u_ud string, u_sd string
) ROW FORMAT SERDE ‘org.apache.hadoop.hive.hbase.HBaseSerDe‘
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler‘
with serdeproperties(‘hbase.columns.mapping‘=‘:key,log:pl,log:en,log:s_time,log:p_url,log:u_ud,log:u_sd‘)
tblproperties(‘hbase.table.name‘=‘eventlog‘);

5.1.1创建一个中间表(PS:要和MySql表结构保持一致,计算结果存放此表并同步MySql)

CREATE TABLE stats_view_depth (
platform_dimension_id bigint ,
data_dimension_id bigint ,
kpi_dimension_id bigint ,
pv1 bigint ,
pv2 bigint ,
pv3 bigint ,
pv4 bigint ,
pv5_10 bigint ,
pv10_30 bigint ,
pv30_60 bigint ,
pv60_plus bigint ,
created string
) row format delimited fields terminated by ‘\t‘;

5.1.2创建一个临时表(PS:存放中间结果)

CREATE TABLE stats_view_depth_tmp(pl string, date string, col string, ct bigint);

5.2编写platformdimension和datedimension

注:要继承udf

public class DateDimensionUDF extends UDF {
IDimensionConverter dimension=new DimensionConverImpl();
public IntWritable evaluate(Text txt) {
    try {
        DateDimension dateDimension=DateDimension.buildDate(TimeUtil.parseString2Long(txt.toString()), DateEnum.DAY);

        int id= dimension.getDimensionIdByValue(dateDimension);
        return new IntWritable(1);

    }catch(IOException ex){
        throw new RuntimeException("获取datedimension id异常");
    }       

}
}

将编写的内容进行打包上传到linux服务器

5.3创建hive的function

create function date_convert as ‘shuai7boy.vip.transformer.hive.DateDimensionUDF‘ using jar ‘hdfs://tuge2:9000/transform/transform-0.0.1.jar‘;

其中一开始没加端口号报错:java.lang.IllegalArgumentException: java.net.UnknownHostException: transform ,然后参考博文 https://blog.csdn.net/heming621/article/details/53317562 解决了。

5.4编写HQL语句进行计算

5.4.1根据用户的角度统计每个页面的浏览量

上面我们将HDFS数据导入到了HBase里面,并且做了Hive表和HBase表同步,又因为Hive表支持HQL语句。所在在Hive里面使用HQL语句就能进行分析。

计算用户的浏览深度

from (
select
pl, from_unixtime(cast(s_time/1000 as bigint),‘yyyy-MM-dd‘) as day, u_ud,
(case when count(p_url) = 1 then "pv1"
when count(p_url) = 2 then "pv2"
when count(p_url) = 3 then "pv3"
when count(p_url) = 4 then "pv4"
when count(p_url) >= 5 and count(p_url) <10 then "pv5_10"
when count(p_url) >= 10 and count(p_url) <30 then "pv10_30"
when count(p_url) >=30 and count(p_url) <60 then "pv30_60"
else ‘pv60_plus‘ end) as pv
from event_logs
where
en=‘e_pv‘
and p_url is not null
and pl is not null
and s_time >= unix_timestamp(‘2016-06-08‘,‘yyyy-MM-dd‘)1000
and s_time < unix_timestamp(‘2016-06-09‘,‘yyyy-MM-dd‘)
1000
group by
pl, from_unixtime(cast(s_time/1000 as bigint),‘yyyy-MM-dd‘), u_ud
) as tmp
insert overwrite table stats_view_depth_tmp
select pl,day,pv,count(distinct u_ud) as ct where u_ud is not null group by pl,day,pv;

--将行转列

with tmp as
(
select pl,date as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv1‘ union all
select pl,date as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv2‘ union all
select pl,date as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv3‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv4‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv5_10‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv10_30‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv30_60‘ union all
select pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col=‘pv60_plus‘ union all

select ‘all‘ as pl,date as date1,ct as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv1‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,ct as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv2‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,ct as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv3‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,ct as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv4‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,ct as pv5_10,0 as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv5_10‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,ct as pv10_30,0 as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv10_30‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,ct as pv30_60,0 as pv60_plus from stats_view_depth_tmp where col=‘pv30_60‘ union all
select ‘all‘ as pl,date as date1,0 as pv1,0 as pv2,0 as pv3,0 as pv4,0 as pv5_10,0 as pv10_30,0 as pv30_60,ct as pv60_plus from stats_view_depth_tmp where col=‘pv60_plus‘
)
from tmp
insert overwrite table stats_view_depth
select 2,date_convert(date1),6,sum(pv1),sum(pv2),sum(pv3),sum(pv4),sum(pv5_10),sum(pv10_30),sum(pv30_60),sum(pv60_plus),‘2017-01-10‘ group by pl,date1;

执行可能出现异常:Failed to recognize predicate ‘xxx‘. Failed rule: ‘identifier‘ in column specification。

解决方案:主要原因是使用了date关键字导致的,弃用保留关键字即可。

在hive-site.xml里面添加如下命令:

hive.support.sql11.reserved.keywords
false

参考博客: https://blog.csdn.net/sjf0115/article/details/73244762

5.5使用Sqoop将hive中的数据同步到MySql里面

退出hive命令,执行以下语句:

sqoop export --connect jdbc:mysql://tuge1:3306/result_db --username root --password 123456 --table stats_view_depth --export-dir /user/hive/warehouse/stats_view_depth/* --input-fields-terminated-by "\t" --update-mode allowinsert --update-key platform_dimension_id,data_dimension_id,kpi_dimension_id;

然后就能在MySql里面看到数据了,以后的事情就是把数据在平台渲染下,这里就不演示了。

原文地址:https://www.cnblogs.com/shun7man/p/12227753.html

时间: 2024-11-13 06:36:27

Flume,Sqoop学习以及应用的相关文章

【Sqoop学习之二】Sqoop使用

环境 sqoop-1.4.6 一.基本命令1.帮助命令 [[email protected] ~]# sqoop help Warning: /usr/local/sqoop-1.4.6/../hbase does not exist! HBase imports will fail. Please set $HBASE_HOME to the root of your HBase installation. Warning: /usr/local/sqoop-1.4.6/../hcatalog

Flume NG 学习笔记(三)流配置

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 在通过flume采集日志数据的时候,一般都是通过flume 代理从日志源或者日志客户端采集数据到flume代理中,然后再由flume代理送到目标存储.上图中就是每个一级flume代理负责从webserv采集数据,然后再由一个二级flume代理进行日志汇总. Flume支持从一个源发送事件到多个通道中,这被称为事件流的复用.这里需要在配置中定义事件流的复制/复用,选择1个或者多个通道进行数据流向. 下面的内容主要介绍flume

Flume NG 学习笔记(五)Sinks和Channel配置

一.HDFS Sink Flume Sink是将事件写入到Hadoop分布式文件系统(HDFS)中.主要是Flume在Hadoop环境中的应用,即Flume采集数据输出到HDFS,适用大数据日志场景. 目前,它支持HDFS的文本和序列文件格式,以及支持两个文件类型的压缩.支持将所用的时间.数据大小.事件的数量为操作参数,对HDFS文件进行关闭(关闭当前文件,并创建一个新的).它还可以对事源的机器名(hostname)及时间属性分离数据,即通过时间戳将数据分布到对应的文件路径. HDFS目录路径可

sqoop学习

最近学习了下这个导数据的工具,但是在export命令这里卡住了,暂时排不了错误.先记录学习的这一点吧 sqoop是什么 sqoop(sql-on-hadoop):是用来实现结构型数据(如关系型数据库)和hadoop之间进行数据迁移的工具.它充分利用了mapreduce的并行特点以及批处理的方式加快数据的传输,同时也借助mapreduce实现了容错. sqoop架构 1)sqoop目前有两个版本sqoop1(1.4.x)和sqoop2(1.99.x),这里安装的是sqoop1版本 2)sqoop1

Flume NG 学习笔记(一)简介

一.简介 Flume是一个分布式.可靠.高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力. Flume在0.9.x and 1.x之间有较大的架构调整,1.x版本之后的改称Flume NG(next generation),0.9.x的称为Flume OG(originalgeneration). 对于OG版本, Flume NG (1.x.x)的主要变化如下: 1.sources和sinks 使用chann

Flume NG 学习笔记(二)单机与集群Flume 配置

下面的内容基本来自官网:http://flume.apache.org/FlumeUserGuide.html 本文使用的是最新版本的apache flume 1.5,安装完Flume然后测试下Flume是否可以用,在Flume目录下用以下语句测试: bin/flume-ng agent -n$agent_name -c conf -f conf/flume-conf.properties.template 结果如图显示: Ok,我们接下去看下面常用架构.功能配置示例 一.最简单的单一代理Flu

Flume NG 学习笔记(四)Source配置

首先.这节水的东西就比较少了,大部分是例子. 一.Avro Source与Thrift Source Avro端口监听并接收来自外部的Avro客户流的事件.当内置Avro 去Sinks另一个配对Flume代理,它就可以创建分层采集的拓扑结构.官网说的比较绕,当然我的翻译也很弱,其实就是flume可以多级代理,然后代理与代理之间用Avro去连接 下面是官网给出的source的配置,加粗的参数是必选,描述就不解释了. Property Name Default Description channel

Flume NG 学习笔记(八)Interceptors(拦截器)测试

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 拦截器主要是对事件的header信息信息操作,要么直接忽略他,要么修改他的数据 一.Event Serializers file_roll sink 和hdfs sink 都支持EventSerializer接口 1.1.Body Text Serializer Body TextSerializer,别名:text.这个拦截器将把事件的body部分写入到输出流中而不需要任何转换或者修改.事件的header将直接被忽略. 下

Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 一.Transaction interface Transaction接口是基于flume的稳定性考虑的.所有主要的组件(sources.sinks.channels)都必须使用Flume Transaction.我们也可以理解Transaction接口就是flume的事务,sources和sinks的发送数据与接受数据都是在一个Transaction里完成的. 从上图中可以看出,一个Transaction在Channel实