(一)idea工具开发数据生成模拟程序
1.在idea开发工具中构建weblogs项目,编写数据生成模拟程序。
package main.java;
import java.io.*;
public class ReadWrite {
static String readFileName;
static String writeFileName;
public static void main(String args[]){
readFileName = args[0];
writeFileName = args[1];
try {
// readInput();
readFileByLines(readFileName);
}catch(Exception e){
}
}
public static void readFileByLines(String fileName) {
FileInputStream fis = null;
InputStreamReader isr = null;
BufferedReader br = null;
String tempString = null;
try {
System.out.println("以行为单位读取文件内容,一次读一整行:");
fis = new FileInputStream(fileName);// FileInputStream
// 从文件系统中的某个文件中获取字节
isr = new InputStreamReader(fis,"GBK");
br = new BufferedReader(isr);
int count=0;
while ((tempString = br.readLine()) != null) {
count++;
// 显示行号
Thread.sleep(300);
String str = new String(tempString.getBytes("UTF8"),"GBK");
System.out.println("row:"+count+">>>>>>>>"+tempString);
method1(writeFileName,tempString);
//appendMethodA(writeFileName,tempString);
}
isr.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (isr != null) {
try {
isr.close();
} catch (IOException e1) {
}
}
}
}
public static void method1(String file, String conent) {
BufferedWriter out = null;
try {
out = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(file, true)));
out.write("\n");
out.write(conent);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
2.参照前面idea工具项目打包方式,将该项目打成weblogs.jar包,然后上传至bigdata-pro01.kfk.com节点的/opt/jars目录下(目录需要提前创建)
3.将weblogs.jar分发到另外两个节点
1)在另外两个节点上分别创建/opt/jars目录
mkdir /opt/jars
2)将weblogs.jar分发到另外两个节点
scp weblogs.jar bigdata-pro02.kfk.com:/opt/jars/
scp weblogs.jar bigdata-pro03.kfk.com:/opt/jars/
4.编写运行模拟程序的shell脚本
1)在bigdata-pro02.kfk.com节点的/opt/datas目录下,创建weblog-shell.sh脚本。
vi weblog-shell.sh
#/bin/bash
echo "start log......"
#第一个参数是原日志文件,第二个参数是日志生成输出文件
java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log
修改weblog-shell.sh可执行权限
chmod 777 weblog-shell.sh
2)将bigdata-pro02.kfk.com节点上的/opt/datas/目录拷贝到bigdata-pro03节点.kfk.com
scp -r /opt/datas/ bigdata-pro03.kfk.com:/opt/datas/
3)修改bigdata-pro02.kfk.com和bigdata-pro03.kfk.com节点上面日志采集文件路径。以bigdata-pro02.kfk.com节点为例。
vi flume-conf.properties
agent2.sources = r1
agent2.channels = c1
agent2.sinks = k1
agent2.sources.r1.type = exec
#修改采集日志文件路径,bigdata-pro03.kfk.com节点也是修改此处
agent2.sources.r1.command = tail -F /opt/datas/weblog-flume.log
agent2.sources.r1.channels = c1
agent2.channels.c1.type = memory
agent2.channels.c1.capacity = 10000
agent2.channels.c1.transactionCapacity = 10000
agent2.channels.c1.keep-alive = 5
agent2.sinks.k1.type = avro
agent2.sinks.k1.channel = c1
agent2.sinks.k1.hostname = bigdata-pro01.kfk.com
agent2.sinks.k1.port = 5555
(二)编写启动flume服务程序的shell脚本
1.在bigdata-pro02.kfk.com节点的flume安装目录下编写flume启动脚本。
vi flume-kfk-start.sh
#/bin/bash
echo "flume-2 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent2 -Dflume.root.logger=INFO,console
2.在bigdata-pro03.kfk.com节点的flume安装目录下编写flume启动脚本。
vi flume-kfk-start.sh
#/bin/bash
echo "flume-3 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent3 -Dflume.root.logger=INFO,console
3.在bigdata-pro01.kfk.com节点的flume安装目录下编写flume启动脚本。
vi flume-kfk-start.sh
#/bin/bash
echo "flume-1 start ......"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console
(三)编写Kafka Consumer执行脚本
1.在bigdata-pro01.kfk.com节点的Kafka安装目录下编写Kafka Consumer执行脚本
vi kfk-test-consumer.sh
#/bin/bash
echo "kfk-kafka-consumer.sh start ......"
bin/kafka-console-consumer.sh --zookeeper bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 --from-beginning --topic weblogs
2.将kfk-test-consumer.sh脚本分发另外两个节点
scp kfk-test-consumer.sh bigdata-pro02.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/
scp kfk-test-consumer.sh bigdata-pro03.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/
(四)启动模拟程序并测试
在bigdata-pro02.kfk.com节点启动日志产生脚本,模拟产生日志是否正常。
/opt/datas/weblog-shell.sh
(五)启动数据采集所有服务
1.启动Zookeeper服务
bin/zkServer.sh start
2.启动hdfs服务
sbin/start-dfs.sh
3.启动HBase服务
bin/start-hbase.sh
创建hbase业务表
bin/hbase shell
create ‘weblogs‘,‘info‘
4.启动Kafka服务
bin/kafka-server-start.sh config/server.properties &
创建业务数据topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic weblogs --replication-factor 1 --partitions 1
5.配置flume相关环境变量
vi flume-env.sh
export JAVA_HOME=/opt/modules/jdk1.7.0_67
export HADOOP_HOME=/opt/modules/hadoop-2.5.0
export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0
(六)完成数据采集全流程测试
1.在bigdata-pro01.kfk.com节点上启动flume聚合脚本,将采集的数据分发到Kafka集群和hbase集群。
./flume-kfk-start.sh
2.在bigdata-pro02.kfk.com节点上完成数据采集
1)使用shell脚本模拟日志产生
cd /opt/datas/
./weblog-shell.sh
2)启动flume采集日志数据发送给聚合节点
./flume-kfk-start.sh
3.在bigdata-pro03.kfk.com节点上完成数据采集
1)使用shell脚本模拟日志产生
cd /opt/datas/
./weblog-shell.sh
2)启动flume采集日志数据发送给聚合节点
./flume-kfk-start.sh
4.启动Kafka Consumer查看flume日志采集情况
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic weblogs --from-beginning
5.查看hbase数据写入情况
./hbase-shell
count ‘weblogs‘
原文地址:https://www.cnblogs.com/ratels/p/10844870.html