新闻网大数据实时分析可视化系统项目——10、数据采集/存储/分发完整流程测试

(一)idea工具开发数据生成模拟程序

1.在idea开发工具中构建weblogs项目,编写数据生成模拟程序。

package main.java;

import java.io.*;

public class ReadWrite {

static String readFileName;

static String writeFileName;

public static void main(String args[]){

readFileName = args[0];

writeFileName = args[1];

try {

// readInput();

readFileByLines(readFileName);

}catch(Exception e){

}

}

public static void readFileByLines(String fileName) {

FileInputStream fis = null;

InputStreamReader isr = null;

BufferedReader br = null;

String tempString = null;

try {

System.out.println("以行为单位读取文件内容,一次读一整行:");

fis = new FileInputStream(fileName);// FileInputStream

// 从文件系统中的某个文件中获取字节

isr = new InputStreamReader(fis,"GBK");

br = new BufferedReader(isr);

int count=0;

while ((tempString = br.readLine()) != null) {

count++;

// 显示行号

Thread.sleep(300);

String str = new String(tempString.getBytes("UTF8"),"GBK");

System.out.println("row:"+count+">>>>>>>>"+tempString);

method1(writeFileName,tempString);

//appendMethodA(writeFileName,tempString);

}

isr.close();

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

} finally {

if (isr != null) {

try {

isr.close();

} catch (IOException e1) {

}

}

}

}

public static void method1(String file, String conent) {

BufferedWriter out = null;

try {

out = new BufferedWriter(new OutputStreamWriter(

new FileOutputStream(file, true)));

out.write("\n");

out.write(conent);

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

out.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

2.参照前面idea工具项目打包方式,将该项目打成weblogs.jar包,然后上传至bigdata-pro01.kfk.com节点的/opt/jars目录下(目录需要提前创建)

3.将weblogs.jar分发到另外两个节点

1)在另外两个节点上分别创建/opt/jars目录

mkdir /opt/jars

2)将weblogs.jar分发到另外两个节点

scp weblogs.jar bigdata-pro02.kfk.com:/opt/jars/

scp weblogs.jar bigdata-pro03.kfk.com:/opt/jars/

4.编写运行模拟程序的shell脚本

1)在bigdata-pro02.kfk.com节点的/opt/datas目录下,创建weblog-shell.sh脚本。

vi weblog-shell.sh

#/bin/bash

echo "start log......"

#第一个参数是原日志文件,第二个参数是日志生成输出文件

java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

修改weblog-shell.sh可执行权限

chmod 777 weblog-shell.sh

2)将bigdata-pro02.kfk.com节点上的/opt/datas/目录拷贝到bigdata-pro03节点.kfk.com

scp -r /opt/datas/ bigdata-pro03.kfk.com:/opt/datas/

3)修改bigdata-pro02.kfk.com和bigdata-pro03.kfk.com节点上面日志采集文件路径。以bigdata-pro02.kfk.com节点为例。

vi flume-conf.properties

agent2.sources = r1

agent2.channels = c1

agent2.sinks = k1

agent2.sources.r1.type = exec

#修改采集日志文件路径,bigdata-pro03.kfk.com节点也是修改此处

agent2.sources.r1.command = tail -F /opt/datas/weblog-flume.log

agent2.sources.r1.channels = c1

agent2.channels.c1.type = memory

agent2.channels.c1.capacity = 10000

agent2.channels.c1.transactionCapacity = 10000

agent2.channels.c1.keep-alive = 5

agent2.sinks.k1.type = avro

agent2.sinks.k1.channel = c1

agent2.sinks.k1.hostname = bigdata-pro01.kfk.com

agent2.sinks.k1.port = 5555

(二)编写启动flume服务程序的shell脚本

1.在bigdata-pro02.kfk.com节点的flume安装目录下编写flume启动脚本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-2 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent2 -Dflume.root.logger=INFO,console

2.在bigdata-pro03.kfk.com节点的flume安装目录下编写flume启动脚本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-3 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent3 -Dflume.root.logger=INFO,console

3.在bigdata-pro01.kfk.com节点的flume安装目录下编写flume启动脚本。

vi flume-kfk-start.sh

#/bin/bash

echo "flume-1 start ......"

bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console

(三)编写Kafka Consumer执行脚本

1.在bigdata-pro01.kfk.com节点的Kafka安装目录下编写Kafka Consumer执行脚本

vi kfk-test-consumer.sh

#/bin/bash

echo "kfk-kafka-consumer.sh start ......"

bin/kafka-console-consumer.sh --zookeeper bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 --from-beginning --topic weblogs

2.将kfk-test-consumer.sh脚本分发另外两个节点

scp kfk-test-consumer.sh bigdata-pro02.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/

scp kfk-test-consumer.sh bigdata-pro03.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/

(四)启动模拟程序并测试

在bigdata-pro02.kfk.com节点启动日志产生脚本,模拟产生日志是否正常。

/opt/datas/weblog-shell.sh

(五)启动数据采集所有服务

1.启动Zookeeper服务

bin/zkServer.sh start

2.启动hdfs服务

sbin/start-dfs.sh

3.启动HBase服务

bin/start-hbase.sh

创建hbase业务表

bin/hbase shell

create ‘weblogs‘,‘info‘

4.启动Kafka服务

bin/kafka-server-start.sh config/server.properties &

创建业务数据topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic weblogs --replication-factor 1 --partitions 1

5.配置flume相关环境变量

vi flume-env.sh

export JAVA_HOME=/opt/modules/jdk1.7.0_67

export HADOOP_HOME=/opt/modules/hadoop-2.5.0

export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0

(六)完成数据采集全流程测试

1.在bigdata-pro01.kfk.com节点上启动flume聚合脚本,将采集的数据分发到Kafka集群和hbase集群。

./flume-kfk-start.sh

2.在bigdata-pro02.kfk.com节点上完成数据采集

1)使用shell脚本模拟日志产生

cd /opt/datas/

./weblog-shell.sh

2)启动flume采集日志数据发送给聚合节点

./flume-kfk-start.sh

3.在bigdata-pro03.kfk.com节点上完成数据采集

1)使用shell脚本模拟日志产生

cd /opt/datas/

./weblog-shell.sh

2)启动flume采集日志数据发送给聚合节点

./flume-kfk-start.sh

4.启动Kafka Consumer查看flume日志采集情况

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic weblogs --from-beginning

5.查看hbase数据写入情况

./hbase-shell

count ‘weblogs‘

原文地址:https://www.cnblogs.com/ratels/p/10844870.html

时间: 2024-10-07 04:24:04

新闻网大数据实时分析可视化系统项目——10、数据采集/存储/分发完整流程测试的相关文章

新闻网大数据实时分析可视化系统项目——12、Hive与HBase集成进行数据分析

(一)Hive 概述 (二)Hive在Hadoop生态圈中的位置 (三)Hive 架构设计 (四)Hive 的优点及应用场景 (五)Hive 的下载和安装部署 1.Hive 下载 Apache版本的Hive. Cloudera版本的Hive. 这里选择下载Apache稳定版本apache-hive-0.13.1-bin.tar.gz,并上传至bigdata-pro03.kfk.com节点的/opt/softwares/目录下. 2.解压安装hive tar -zxf apache-hive-0.

新闻网大数据实时分析可视化系统项目——9、Flume+HBase+Kafka集成与开发

1.下载Flume源码并导入Idea开发工具 1)将apache-flume-1.7.0-src.tar.gz源码下载到本地解压 2)通过idea导入flume源码 打开idea开发工具,选择File——>Open 然后找到flume源码解压文件,选中flume-ng-hbase-sink,点击ok加载相应模块的源码. 2.官方flume与hbase集成的参数介绍 3.下载日志数据并分析 到搜狗实验室下载用户查询日志 1)介绍 搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索

BI大数据智能可视化大屏分析系统建设软件开发

要建设企业级大数据可视化分析系统,需要构建企业统一的数据库体系或者直接将已有数据库对接.进行数据建模,为数据分析可视化呈现奠定基础.通过数据分析管理系统,有了数据基础,就可以构建BI大数据智能可视化大屏分析,满足企业的业务需求,提升数据价值. BI大数据智能可视化大屏分析系统建设软件开发的技术实现: 1.Hadoop:使用 hadoop作为系统的基础框架,对数据进行分布式的存储和分析.HDFS是 hadoop提供的分布式存储系统,它对体积巨大的数据切分成多个小块存储的不同的节点,每个块又做了多个

大数据高并发系统架构实战方案

大数据高并发系统架构实战方案(LVS负载均衡.Nginx.共享存储.海量数据.队列缓存 ) 随着互联网的发展,高并发.大数据量的网站要求越来越高.而这些高要求都是基础的技术和细节组合而成的.本课程就从实际案例出发给大家原景重现高并发架构常用技术点及详细演练.通过该课程的学习,普通的技术人员就可以快速搭建起千万级的高并发大数据网站平台,课程涉及内容包括:LVS实现负载均衡.Nginx高级配置实战.共享存储实现动态内容静态化加速实战.缓存平台安装配置使用.mysql主从复制安装配置实战等.课程二十.

大数据、云计算系统顶级架构师课程学习视频

本课程为大数据.云计算系统架构师高级培训课程,授课模式为线上视频+直播答疑,本套教程2000多节课,里面的hadoop.spark都是新版本 6个阶段共31部分:1.Linux基础2.大数据基础Hadoop 2.X3.大数据仓库Hive4.大数据协作框架5.分布式数据库HBase6.Storm流计算从入门到精通之技术篇7.Scala语言从入门到精通8.内存计算框架Spark9.Spark深入剖析10.企业大数据平台11.驴妈妈旅游网大型离线数据电商分析平台12.Storm流计算之项目篇13.某团

大数据从基础到项目实战(一站式全链路最佳学习路径)

大数据从基础到项目实战(一站式全链路最佳学习路径)课程链接:https://pan.baidu.com/s/1HC9zqxwUFNBJHT9zP1dlvg 密码:xdgd 本课程为就业课程,以完整的实战项目为主线,项目各个环节既深入讲解理论知识,又结合项目业务进行实操,从而达到一站式学习,让你快速达到就业水平. 全真企业项目全流程演示: 大数据生产->采集->存储->处理->计算->分析(离线+实时)->抽取(离线+实时)->Java接口->可视化Web展示

人工智能、大数据与复杂系统 全部课程

人工智能.大数据与复杂系统[下载地址:https://pan.baidu.com/s/1dg8F4hSTTaPDUpDpd3AqWA ] 黑科技,人工智能前进之路势不可挡! "做大做强新兴产业集群,实施大数据发展行动,加强新一代人工智能研发应用.发展智能产业,拓展智能生活." 人工智能已作为国家乃至全球新的经济增长动力,重要性不言而喻.世界经济论坛有个数据,AI企业的全球投资已经从2011年的2.8亿美元增长至2017年的超过40亿美元,增长势头愈发强劲.人工智能竞争是全球化的竞争,也

Spark进阶 大数据离线与实时项目实战 完整版

第1章 课程介绍&学习指南本章会对这门课程进行说明并进行学习方法介绍. 第2章 Redis入门Redis是目前最火爆的内存数据库之一,通过在内存中读写数据,大大提高了读写速度.本章将从Redis特性.应用场景出发,到Redis的基础命令,再到Redis的常用数据类型实操,最后通过Java API来操作Redis,为后续实时处理项目打下坚实的基础... 第3章 HBase入门HBase是一个分布式的.面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:

王家林亲传《DT大数据梦工厂》第二讲Scala函数定义、流程控制、异常处理入门

你想了解大数据,你想成为年薪百万吗?那你还等着什么,快点来吧!跟着王家林老师学习spark大数据 第二讲主要讲了Scala函数定义.流程控制.异常处理入门 函数定义: 关键字(def) 函数名称 参数(参数名称:参数类型):返回内容类型  =  { 函数体 } 注意: Unit:空的返回内容 Scala结束语是不需要写分号 下面一代码为例: //不带参数 Object  ScalaBasics{ def doWhile(){ var line = “” do{ line = readLine()