这两天主要是做了中国移动的实时数据分析一个小项目(可以说是demo了),这里记录下来整个过程里面遇到的坑,首先安装好flume,kafka,spark(基于代码本地运行可以不安装),redis,zookeeper 主要是为了熟悉一下整个的一个spark-streaming的一个整个流程,还有就是了解调优的地方。
上述假设已经安装好了相应的组件,然后就开始正式的踩坑之路:
1.编写一个java程序去读取原始数据文件,模拟1s进行文件的插入一行,原始的数据文件格式如下:
坑a
.整个的数据格式是json,但是是一整行的。。。。
解决a1:于是就想这去把这样的数据转化为json格式的,就去捣鼓了一下notepad++转json格式的方法:notepad++上面的菜单栏中,插件-> plugins Admin..->search中直接查找就好了,然后找找有个install的按钮点击一下就ok了,然后各种确定,之后notepad++会自动重启,重启之后上面的菜单栏中,插件->就会多出一个JSON Viewer,然后就可以了。但是我操作的时候遇到了notepad++重启之后没有出现JSON Viewer(但是后来又出现了),
解决a2:于是又去找了idea实现json格式的方法:setting->keymap->main enum->code->reformat code 这个功能是将文本格式化,该功能的快捷键默认是ctrl+shift+l,但是这个快捷键组合是有冲突的,所以将其转化为ctrl+shift+s,修改后进行保存,然后创建一个xxx.json的文件,复制一行json数据到该文件中,然后全选,按下ctrl+shift+s即可转化为标准的json文件格式
相应的java实现代码如下:
import java.io.*;import java.util.ArrayList;import java.util.List; public class WriteCMCC { public static void main(String[] args) { List<String> allLines = getCmcc(args[0]); System.out.println(allLines.size()); writeCmcc(allLines, args[1]); } /** * 一次性读取cmcc中的数据 * @return 存放在list中 */ private static List<String> getCmcc(String path) { BufferedReader br = null; List<String> allLines = new ArrayList<String>(); try { br = new BufferedReader(new FileReader(new File(path))); String line = ""; while ((line = br.readLine()) != null) { allLines.add(line); } } catch (Exception e) { e.printStackTrace(); }finally { try { if (br != null) br.close(); } catch (IOException e) { e.printStackTrace(); } } return allLines; } /** * 写入cmcc中的数据,一次写入一个list的数据集 */ private static void writeCmcc(List<String> cmcc, String path) { BufferedWriter bw = null; try { bw = new BufferedWriter(new FileWriter(new File(path))); for(String line : cmcc) { bw.write(line); bw.flush(); Thread.sleep(1000); bw.newLine(); } } catch (Exception e) { e.printStackTrace(); }finally { try { if (bw != null) bw.close(); } catch (IOException e) { e.printStackTrace(); } } }}
代码写好,然后测试完,然后打成jar包,丢到Linux准备运行。
java -jar /home/soft/jar/write_cmcc_5_seconds.jar /home/soft/cmcc.log /home/soft/cmcc/cmcc_write.log
2.flume编写相应的conf去把数据抽取到kafka中(cmcc.conf)
先启动zookeeper,启动kafka并创建topic(cmcc):
zookeeper启动命令:
/home/soft/zookeeper-3.4.6/bin/zkServer.sh start(每个节点都需要启动)
kafka启动命令:
/home/soft/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /home/soft/kafka_2.11-0.10.1.0/config/server.properties &
kafka创建topic:
bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --create --topic cmcc --partitions 6 --replication-factor
kafka查看所有的topic:
bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --list
然后编写conf测试(cmcc.conf):
a1.sources = s1
a1.channels = c1
#这里先不使用该种方式去读取文件,因为该方式flume会出如下的错误
#java.lang.IllegalStateException: File has been modified since being read: /home/soft/cmcc/cmcc_write.log
#原因:出现这个问题的原因是,当我们拷贝一个文件的时候,一些对文件进行了修改
#解决:最好的方法就是,确保大文件完全拷贝后,再让flume来读取,思路是将拷贝中的文件加上一个多余的后缀,flume一开始不会读取文件,当文件拷贝完成后去掉多余的后缀,这个时候flume就会针对新文件进行读取。
#a1.sources.s1.type =spooldir
#a1.sources.s1.spoolDir =/home/soft/cmcc
#a1.sources.s1.fileHeader= true
a1.sources.s1.type=exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = os1:9092,os2:9092,os3:9092
#创建好相应的topic
a1.channels.c1.kafka.topic = cmcc
#这个是自己定义的没啥事情
a1.channels.c1.kafka.consumer.group.id = flume-consumer
#这个一定要设置,否则就是个坑,写入到kafka中的数据会被追加进一些数据,而且还是乱码的
a1.channels.c1.parseAsFlumeEvent = false
#拼接source和channel
a1.sources.s1.channels=c1
flume启动命令:下面的a1就对应着上面的a1(控制台打印信息)
bin/flume-ng agent -n a1 -c conf -f conf/cmcc.conf -Dflume.root.logger=INFO,console
3.spark程序去读取kafka的中的数据并将结果存放至redis中
启动redis:/usr/local/redis/bin/redis-server /usr/local/redis/etc/redis.conf
程序相应的配置:resources -> application.conf
#kafka的相关参数 kafka.topic = "cmcc" kafka.broker.list="os1:9092,os2:9092,os3:9092" kafka.group.id="cmcc" redis.host="xxx.xxx.xxx.xxx" redis.db.index="0"
主程序代码:scala -> BootStarpApp
package app import java.text.SimpleDateFormat import com.alibaba.fastjson.JSON import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import utils.{AppParams, Jpools} object BootStarpApp { def main(args: Array[String]): Unit = { /** * 错误集: * 1.Caused by: org.apache.kafka.common.KafkaException: org.codehaus.jackson.map.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer * 错误解释,kafka在进行序列化实例对象的时候出错 * 查找原因: * org.codehaus.jackson.map.deser.std.StringDeserializer是我们AppParas中导入的类型,可能是导错了, * 查看后发现应该导入:import org.apache.kafka.common.serialization.StringDeserializer * 2. 程序出现INFO:Marking the coordinator os3:9092 (id: 2147483645 rack: null) dead for group cmcc_test2,且程序不再执行下去 * 原因:因为kafka-clent程序默认读取到kafka上的信息之后将host:os3返回作为主机节点去获取数据,但是在本机中没有配置相应的host与ip的映射,所有这里就无法直接进行访问os3 * 解决办法;在windows中配置相应的ip与hostname的映射(kafka中的broker节点) * 3.json解析出错:error parse false * 原因json格式错误 * * 4.flume的坑:a0.channels.c1.parseAsFlumeEvent = false 1.7以后默认为true * 如果设置此项为 true,Kafka Sink 则会把数据按照标准的 Flume Event 格式(即Headers域和body域结合的数据结构)发送。Flume Event 中的 Headers 域通常是一些附加字段,可以是时间戳(比如时间戳拦截器指定的时间戳)、文件名(比如 spooldir Source 开启的 fileHeader = true)等信息。但是 1.7.0 版本的 Flume 一旦开启此配置,会导致 Headers 域里面的信息乱码 * * 5.flume异常崩溃 File has been modified since being read * 原因:出现这个问题的原因是,当我们拷贝一个文件的时候,一些对文件进行了修改,就会出现这个错误 * 解决:最好的方法就是,确保大文件完全拷贝后,再让flume来读取,思路是将拷贝中的文件加上一个多余的后缀,flume一开始不会读取文件,当文件拷贝完成后去掉多余的后缀,这个时候flume就会针对新文件进行读取。 * 另外针对大文件,flume的解决方案可以设置一个文件完成后缀: */ val sparkConf = new SparkConf() sparkConf.setAppName("中国移动运营实时监控平台") sparkConf.setMaster("local[*]") /** *将rdd以kryo的序列化保存,以减少内存的使用 */ sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") /** * 对rdd进行压缩,使用内存空间换去处理时间的方式,减少内存的使用 */ sparkConf.set("spark.rdd.compress", "true") /** * */ sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100") /** * 进行优雅的停止程序 */ sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true") /** * 每两秒执行一个批次 */ val ssc = new StreamingContext(sparkConf, Seconds(2)) /** * 获取kafka的数据 * LocationStrategies:位置策略,如果kafka的broker节点与Excutor在同一台机器上给一种策略,不再一台机器上给另一种策略 * 设定策略之后会以最有的策略进行获取数据 * 一般在企业中kafka节点与Excutor不会放到一台机器的,原因是kafka是消息存储的,Executor是用来做消息计算的 * 因此计算与存储需要分开,存储对磁盘要求高,计算对内存和cpu的要求更高 * 如果Executor节点跟Broker的节点在一起的话就使用PreferBrokers策略,不再一起的话就使用preferConsisent策略 * 使用preferConsisent策略的话,将来在kafka中拉去数据以后尽量将数据分散到所有的Executor上 */ val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent , ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams)) stream.foreachRDD(rdd => { /** * { * "bussinessRst": "0000", * "channelCode": "0705", * "chargefee": "10000", * "clientIp": "125.82.117.133", * "endReqTime": "20170412080609613", * "idType": "01", * "interFacRst": "0000", * "logOutTime": "20170412080609613", * "orderId": "384681890175026754", * "prodCnt": "1", * "provinceCode": "280", * "requestId": "20170412080450886738519397327610", * "retMsg": "成功", * "serverIp": "172.16.59.241", * "serverPort": "8088", * "serviceName": "sendRechargeReq", * "shouldfee": "9950", * "startReqTime": "20170412080609503", * "sysId": "15" * } */ /** * 业务逻辑: * serviceName:reChargeNotifyReq,则为充值通知的记录 * requestId:包含充值的日期(订单开始时间) * bussinessRst:是否成功 0000 为成功,其他为不成功 * chargefee:充值的金额 * receiveNotifyTime:订单结束时间 * */ /** * 我们可以通过serviceName字段来确定,如果该字段是reChargeNotifyReq则代表该条数据是充值通知部分的数据。 * 获取所有的充值通知日志 */ val baseData = rdd.map(cr => { print(cr.value()) JSON.parseObject(cr.value()) }).filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache() /** * 获取每天充值成功的订单笔数 * 回忆: * wordcount flatMap-》map-》reduceByKey */ val totalSucc = baseData.map(obj=> { //获取日期 val reqId = obj.getString("requestId") //获取日期 val day = reqId.substring(0, 8) //取出该条充值是否成功的标志 val result = obj.getString("bussinessRst") val flag = if(result.equals("0000")) 1 else 0 (day, flag) }).reduceByKey(_+_) /** * 获取充值成功的订单金额 */ val totalMoney = baseData.map(obj=> { val reqId = obj.getString("requestId") //获取日期 val day = reqId.substring(0, 8) //去除该条充值是否成功的标记 val result = obj.getString("bussinessRst") val fee = if(result.equals("0000")) obj.getString("chargefee").toDouble else 0 (day, fee) }).reduceByKey(_+_) //总订单数 val total = baseData.count() /** * 获取充值成功的充值时长 */ val totalTime = baseData.map(obj=> { var reqId = obj.getString("requestId") //获取日期 val day = reqId.substring(0, 8) //取出该条充值是否成功的标示 val result = obj.getString("bussinessRst") //时间格式为:yyyyMMddHHmissSSS val endTime = obj.getString("receiveNotifyTime") val startTime = reqId.substring(0, 17) val format = new SimpleDateFormat("yyyyMMddHHmissSSS") val cost = if(result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0 (day, cost) }).reduceByKey(_+_) /** * 将数据存储到redis中: * (CMCC-20170412,35) */ totalSucc.foreachPartition(itr=> { val jedis = Jpools.getJedis itr.foreach(tp => { // print("CMCC-"+tp._1, tp._2) jedis.incrBy("CMCC-"+tp._1, tp._2) }) }) }) ssc.start() ssc.awaitTermination() } }
两个工具类:
package utils import com.typesafe.config.ConfigFactory import org.apache.kafka.common.serialization.StringDeserializer object AppParams { /**Scala中使用关键字lazy来定义惰性变量,实现延迟加载(懒加载)。 惰性变量只能是不可变变量,并且只有在调用惰性变量时,才会去实例化这个变量。 load中可以指定相应的配置文件,但是不指定的情况下默认去读取resources下的application.conf文件 默认规则:application.conf->application.json->application.properties **/ private lazy val config = ConfigFactory.load() val redisHost = config.getString("redis.host") val selectDBIndex = config.getInt("redis.db.index") /** * 返回订阅的主题 */ val topic = config.getString("kafka.topic").split(",") /** * kafka集群所在的主机和端口 */ val brokers:String = config.getString("kafka.broker.list") /** * 消费者的id */ val groupId = config.getString("kafka.group.id") /** * 将kafka的相关参数进行分装到map中 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer"-> classOf[StringDeserializer], "group.id"-> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false" ) }
package utils import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.JedisPool /** * 创建一个redis的线程池 */ object Jpools { private val poolConfig = new GenericObjectPoolConfig poolConfig.setMaxIdle(5) //最大的空闲连接数为5,连接池中最大的空闲连接数,默认是8 poolConfig.setMaxTotal(2000) //最大支持的连接数量,默认也是8 //连接池是私有的,不能对外进行公开访问 private lazy val jedisPool = new JedisPool(poolConfig, AppParams.redisHost) def getJedis = { val jedis = jedisPool.getResource jedis.select(AppParams.selectDBIndex) jedis } }
pom文件
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- 导入kafka的依赖--> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.0</version> </dependency>--> <!-- 指定kafka-client API的版本--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.0</version> </dependency> <!-- 导入spark streaming 与kafka的依赖包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.46</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies>
问题总结:
1.json格式的转换 (已解决)
2.flume读取数据到kafka后数据乱码增多问题(已解决)
3.flume spooldir 读取文件的同时对文件更改造成的java.lang.IllegalStateException:File has been modified since being read:问题 (待解决)
4.上述spark主程序代码优化问题 (待解决)
原文地址:https://www.cnblogs.com/zyc-2019/p/10596260.html