新闻网大数据实时分析可视化系统项目——9、Flume+HBase+Kafka集成与开发

1.下载Flume源码并导入Idea开发工具

1)将apache-flume-1.7.0-src.tar.gz源码下载到本地解压

2)通过idea导入flume源码

打开idea开发工具,选择File——》Open

然后找到flume源码解压文件,选中flume-ng-hbase-sink,点击ok加载相应模块的源码。

2.官方flume与hbase集成的参数介绍

3.下载日志数据并分析

到搜狗实验室下载用户查询日志

1)介绍

搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。为进行中文搜索引擎用户行为分析的研究者提供基准研究语料

2)格式说明

数据格式为:访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL

其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID

4.flume agent-3聚合节点与HBase集成的配置

vi flume-conf.properties

agent1.sources = r1

agent1.channels = kafkaC hbaseC

agent1.sinks = kafkaSink hbaseSink

agent1.sources.r1.type = avro

agent1.sources.r1.channels = hbaseC

agent1.sources.r1.bind = bigdata-pro01.kfk.com

agent1.sources.r1.port = 5555

agent1.sources.r1.threads = 5

agent1.channels.hbaseC.type = memory

agent1.channels.hbaseC.capacity = 100000

agent1.channels.hbaseC.transactionCapacity = 100000

agent1.channels.hbaseC.keep-alive = 20

agent1.sinks.hbaseSink.type = asynchbase

agent1.sinks.hbaseSink.table = weblogs

agent1.sinks.hbaseSink.columnFamily = info

agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer

agent1.sinks.hbaseSink.channel = hbaseC

agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl

5.对日志数据进行格式处理

1)将文件中的tab更换成逗号

cat weblog.log|tr "\t" "," > weblog2.log

2)将文件中的空格更换成逗号

cat weblog2.log|tr " " "," > weblog3.log

6.自定义SinkHBase程序设计与开发

1)模仿SimpleAsyncHbaseEventSerializer自定义KfkAsyncHbaseEventSerializer实现类,修改一下代码即可。

@Override

public List getActions() {

List actions = new ArrayList();

if (payloadColumn != null) {

byte[] rowKey;

try {

/*---------------------------代码修改开始---------------------------------*/

//解析列字段

String[] columns = new String(this.payloadColumn).split(",");

//解析flume采集过来的每行的值

String[] values = new String(this.payload).split(",");

for(int i=0;i < columns.length;i++){

byte[] colColumn = columns[i].getBytes();

byte[] colValue = values[i].getBytes(Charsets.UTF_8);

//数据校验:字段和值是否对应

if(colColumn.length != colValue.length) break;

//时间

String datetime = values[0].toString();

//用户id

String userid = values[1].toString();

//根据业务自定义Rowkey

rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);

//插入数据

PutRequest putRequest =  new PutRequest(table, rowKey, cf,

colColumn, colValue);

actions.add(putRequest);

/*---------------------------代码修改结束---------------------------------*/

}

} catch (Exception e) {

throw new FlumeException("Could not get row key!", e);

}

}

return actions;

}

2)在SimpleRowKeyGenerator类中,根据具体业务自定义Rowkey生成方法

/**

* 自定义Rowkey

* @param userid

* @param datetime

* @return

* @throws UnsupportedEncodingException

*/

public static byte[] getKfkRowKey(String userid,String datetime)throws UnsupportedEncodingException {

return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");

}

7.自定义编译程序打jar包

1)在idea工具中,选择File——》ProjectStructrue

2)左侧选中Artifacts,然后点击右侧的+号,最后选择JAR——》From modules with dependencies

3)然后直接点击ok

4)删除其他依赖包,只把flume-ng-hbase-sink打成jar包就可以了。

5)然后依次点击apply,ok

6)点击build进行编译,会自动打成jar包

7)到项目的apache-flume-1.7.0-src\flume-ng-sinks\flume-ng-hbase-sink\classes\artifacts\flume_ng_hbase_sink_jar目录下找到刚刚打的jar包

8)将打包名字替换为flume自带的包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至flume/lib目录下,覆盖原有的jar包即可。

8.flume聚合节点与Kafka集成的配置

vi flume-conf.properties

agent1.sources = r1

agent1.channels = kafkaC hbaseC

agent1.sinks = kafkaSink hbaseSink

agent1.sources.r1.type = avro

agent1.sources.r1.channels = hbaseC kafkaC

agent1.sources.r1.bind = bigdata-pro01.kfk.com

agent1.sources.r1.port = 5555

agent1.sources.r1.threads = 5

agent1.channels.hbaseC.type = memory

agent1.channels.hbaseC.capacity = 100000

agent1.channels.hbaseC.transactionCapacity = 100000

agent1.channels.hbaseC.keep-alive = 20

agent1.sinks.hbaseSink.type = asynchbase

agent1.sinks.hbaseSink.table = weblogs

agent1.sinks.hbaseSink.columnFamily = info

agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer

agent1.sinks.hbaseSink.channel = hbaseC

agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl

#*****************flume+Kafka***********************

agent1.channels.kafkaC.type = memory

agent1.channels.kafkaC.capacity = 100000

agent1.channels.kafkaC.transactionCapacity = 100000

agent1.channels.kafkaC.keep-alive = 20

agent1.sinks.kafkaSink.channel = kafkaC

agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink

agent1.sinks.kafkaSink.brokerList = bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092

agent1.sinks.kafkaSink.topic = test

agent1.sinks.kafkaSink.zookeeperConnect = bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181

agent1.sinks.kafkaSink.requiredAcks = 1

agent1.sinks.kafkaSink.batchSize = 1

agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

原文地址:https://www.cnblogs.com/ratels/p/10844847.html

时间: 2024-08-03 00:07:38

新闻网大数据实时分析可视化系统项目——9、Flume+HBase+Kafka集成与开发的相关文章

新闻网大数据实时分析可视化系统项目——10、数据采集/存储/分发完整流程测试

(一)idea工具开发数据生成模拟程序 1.在idea开发工具中构建weblogs项目,编写数据生成模拟程序. package main.java; import java.io.*; public class ReadWrite { static String readFileName; static String writeFileName; public static void main(String args[]){ readFileName = args[0]; writeFileNa

新闻网大数据实时分析可视化系统项目——12、Hive与HBase集成进行数据分析

(一)Hive 概述 (二)Hive在Hadoop生态圈中的位置 (三)Hive 架构设计 (四)Hive 的优点及应用场景 (五)Hive 的下载和安装部署 1.Hive 下载 Apache版本的Hive. Cloudera版本的Hive. 这里选择下载Apache稳定版本apache-hive-0.13.1-bin.tar.gz,并上传至bigdata-pro03.kfk.com节点的/opt/softwares/目录下. 2.解压安装hive tar -zxf apache-hive-0.

基于大数据的电影网站项目开发之HBase分布式安装(四)

1.hbase解压,通过xftp将hbase-1.0.1.1-bin.tar.gz上传到虚拟机中 通过tar -zxvf hbase-1.0.1.1-bin.tar.gz解压到soft目录下 2. 设置环境变量 HBASE_HOME=/home/meng/soft/hbase-1.0.1.1 export PATH=$PATH:$HBASE_HOME/bin 3.hbase-env.sh中有如下属性: export JAVA_HOME=/usr/java/jdk1.6 将其开启并修改环境变量ex

BI大数据智能可视化大屏分析系统建设软件开发

要建设企业级大数据可视化分析系统,需要构建企业统一的数据库体系或者直接将已有数据库对接.进行数据建模,为数据分析可视化呈现奠定基础.通过数据分析管理系统,有了数据基础,就可以构建BI大数据智能可视化大屏分析,满足企业的业务需求,提升数据价值. BI大数据智能可视化大屏分析系统建设软件开发的技术实现: 1.Hadoop:使用 hadoop作为系统的基础框架,对数据进行分布式的存储和分析.HDFS是 hadoop提供的分布式存储系统,它对体积巨大的数据切分成多个小块存储的不同的节点,每个块又做了多个

大数据高并发系统架构实战方案

大数据高并发系统架构实战方案(LVS负载均衡.Nginx.共享存储.海量数据.队列缓存 ) 随着互联网的发展,高并发.大数据量的网站要求越来越高.而这些高要求都是基础的技术和细节组合而成的.本课程就从实际案例出发给大家原景重现高并发架构常用技术点及详细演练.通过该课程的学习,普通的技术人员就可以快速搭建起千万级的高并发大数据网站平台,课程涉及内容包括:LVS实现负载均衡.Nginx高级配置实战.共享存储实现动态内容静态化加速实战.缓存平台安装配置使用.mysql主从复制安装配置实战等.课程二十.

大数据、云计算系统顶级架构师课程学习视频

本课程为大数据.云计算系统架构师高级培训课程,授课模式为线上视频+直播答疑,本套教程2000多节课,里面的hadoop.spark都是新版本 6个阶段共31部分:1.Linux基础2.大数据基础Hadoop 2.X3.大数据仓库Hive4.大数据协作框架5.分布式数据库HBase6.Storm流计算从入门到精通之技术篇7.Scala语言从入门到精通8.内存计算框架Spark9.Spark深入剖析10.企业大数据平台11.驴妈妈旅游网大型离线数据电商分析平台12.Storm流计算之项目篇13.某团

大数据从基础到项目实战(一站式全链路最佳学习路径)

大数据从基础到项目实战(一站式全链路最佳学习路径)课程链接:https://pan.baidu.com/s/1HC9zqxwUFNBJHT9zP1dlvg 密码:xdgd 本课程为就业课程,以完整的实战项目为主线,项目各个环节既深入讲解理论知识,又结合项目业务进行实操,从而达到一站式学习,让你快速达到就业水平. 全真企业项目全流程演示: 大数据生产->采集->存储->处理->计算->分析(离线+实时)->抽取(离线+实时)->Java接口->可视化Web展示

人工智能、大数据与复杂系统 全部课程

人工智能.大数据与复杂系统[下载地址:https://pan.baidu.com/s/1dg8F4hSTTaPDUpDpd3AqWA ] 黑科技,人工智能前进之路势不可挡! "做大做强新兴产业集群,实施大数据发展行动,加强新一代人工智能研发应用.发展智能产业,拓展智能生活." 人工智能已作为国家乃至全球新的经济增长动力,重要性不言而喻.世界经济论坛有个数据,AI企业的全球投资已经从2011年的2.8亿美元增长至2017年的超过40亿美元,增长势头愈发强劲.人工智能竞争是全球化的竞争,也

Spark进阶 大数据离线与实时项目实战 完整版

第1章 课程介绍&学习指南本章会对这门课程进行说明并进行学习方法介绍. 第2章 Redis入门Redis是目前最火爆的内存数据库之一,通过在内存中读写数据,大大提高了读写速度.本章将从Redis特性.应用场景出发,到Redis的基础命令,再到Redis的常用数据类型实操,最后通过Java API来操作Redis,为后续实时处理项目打下坚实的基础... 第3章 HBase入门HBase是一个分布式的.面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable: