介绍:&emsp本次项目是基于企业大数据经典案例项目(大数据日志分析),全方位、全流程讲解 大数据项目的业务分析、技术选型、架构设计、集群规划、安装部署、整合继承与开发和web可视化交互设计。
一、业务需求
?? (一)捕获用户浏览日志信息
?? (二)实时分析前20名流量最高的新闻话题
?? (三)实时统计当前线上已曝光的新闻话题
?? (四)统计哪个时段用户浏览量最高
二、系统架构
三、集群规划
四、数据源介绍
五、项目实战
1)离线采集数据
?架构:flume+hbase+hive
??数据是实时的用户查询日志信息,为了做离线的统计,需要将数据存储HDFS,写入的压力非常大,单位时间内写入的数据量比较大,直接向hdfs中写入的效率比较低;而hbase,高速随机读写的数据库,这里使用hbase去接收flume传送过来的数据,当数据被传入hbase之后,我们可以使用hive去关联hbase中的数据。
hbase建表:create ‘zy_news_log‘, ‘cf‘
2)关联flume+hbase
?使用sink为hbase或者asynchbase,但是这两种方式其中定义EventSerializer满足不了我们的需求,日志数据有6列,数据以\t分割,但是默认的hbasesink只能做到一列,所以只能自定义hbase-sink。
自定义sink源代码:
编译打包: mvn package -DskipTests
将项目target目录下面的flume-hbase-sink-1.0-SNAPSHOT.jar
放入到flume集群中的lib的目录下。
编写flume:flume-hbase-sink.conf
#########################################################
##主要作用是文件中的新增内容,将数据打入到HBase中
#注意:Flume agent的运行,主要就是配置source channel sink
##下面的a1就是agent的代号,source叫r1 channel叫c1 sink叫k1
#########################################################
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#对于source的配置描述 监听目录中的新增数据
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop1/flume_properties/flume/flume-hbase.txt
#对于sink的配置描述 使用hbase做数据的消费
a1.sinks.k1.type = hbase
a1.sinks.k1.table = news_log
a1.sinks.k1.columnFamily = cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
#对于channel的配置描述 使用内存缓冲区域做数据的临时缓存,文件
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/hadoop1/flume_properties/flume/checkpoint
a1.channels.c1.dataDirs = /home/hadoop1/flume_properties/flume/channel
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel =c1
后台启动:flume-ng agent -n a1 -c conf -f flume-hbase-sink.conf -Dflume.root.logger=INFO,console
关联flume+hbase:
hive创建hbase的表:
CREATE EXTERNAL TABLE zy_news_log(
id string,
datetime string,
userid string,
searchname string,
retorder int,
cliorder int,
cliurl string
)
STORED BY ‘org.apache.hadoop.hive.hbase.HBaseStorageHandler‘
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf:datetime,cf:userid,cf:searchname,cf:retorder,cf:cliorder,cf:cliurl")
TBLPROPERTIES ("hbase.table.name" = "zy_news_log", "hbase.mapred.output.outputtable" = "zy_news_log");
之后就可以在hive中做一些查询操作,来完成相关业务。
2)实时采集数据
?架构:flume+Kafka
创建topic:
bin/kafka-topics.sh --create --topic zy-news-logs --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3 --replication-factor 3
指定flume的配置文件:flume-kafka-sink.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#对于source的配置描述 监听文件中的新增数据 exec
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/data/projects/news/data/news_log_rt.log
#对于sink的配置描述 使用kafka日志做数据的消费
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop01:9092,hadoop02:9092,hadoop03:9092
a1.sinks.k1.kafka.topic = zy-news-logs
a1.sinks.k1.kafka.flumeBatchSize = 1000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
#对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
#通过channel c1将source r1和sink k1关联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4)数据的统计分析
? SparkStreaming+Kafka+web+scala+java+maven+hbase/mysql/redis
具体的代码实现可以在小编的博客中下载:
原文地址:http://blog.51cto.com/14048416/2340823