中国移动实时数据分析-基于spark+kafka+flume

  这两天主要是做了中国移动的实时数据分析一个小项目(可以说是demo了),这里记录下来整个过程里面遇到的坑,首先安装好flume,kafka,spark(基于代码本地运行可以不安装),redis,zookeeper 主要是为了熟悉一下整个的一个spark-streaming的一个整个流程,还有就是了解调优的地方。

  上述假设已经安装好了相应的组件,然后就开始正式的踩坑之路:

  1.编写一个java程序去读取原始数据文件,模拟1s进行文件的插入一行,原始的数据文件格式如下:

    

     坑a

    .整个的数据格式是json,但是是一整行的。。。。

    解决a1:于是就想这去把这样的数据转化为json格式的,就去捣鼓了一下notepad++转json格式的方法:notepad++上面的菜单栏中,插件-> plugins Admin..->search中直接查找就好了,然后找找有个install的按钮点击一下就ok了,然后各种确定,之后notepad++会自动重启,重启之后上面的菜单栏中,插件->就会多出一个JSON Viewer,然后就可以了。但是我操作的时候遇到了notepad++重启之后没有出现JSON Viewer(但是后来又出现了),

    解决a2:于是又去找了idea实现json格式的方法:setting->keymap->main enum->code->reformat code 这个功能是将文本格式化,该功能的快捷键默认是ctrl+shift+l,但是这个快捷键组合是有冲突的,所以将其转化为ctrl+shift+s,修改后进行保存,然后创建一个xxx.json的文件,复制一行json数据到该文件中,然后全选,按下ctrl+shift+s即可转化为标准的json文件格式

    

    相应的java实现代码如下:


import java.io.*;import java.util.ArrayList;import java.util.List;

public class WriteCMCC {    public static void main(String[] args) {        List<String> allLines = getCmcc(args[0]);        System.out.println(allLines.size());        writeCmcc(allLines, args[1]);    }

    /**     * 一次性读取cmcc中的数据     * @return 存放在list中     */    private static List<String> getCmcc(String path) {        BufferedReader br = null;        List<String> allLines = new ArrayList<String>();        try {            br = new BufferedReader(new FileReader(new File(path)));            String line = "";            while ((line = br.readLine()) != null) {                allLines.add(line);            }        } catch (Exception e) {            e.printStackTrace();        }finally {            try {                if (br != null) br.close();            } catch (IOException e) {                e.printStackTrace();            }        }

        return allLines;    }

    /**     * 写入cmcc中的数据,一次写入一个list的数据集     */    private static void writeCmcc(List<String> cmcc, String path) {

        BufferedWriter bw = null;        try {            bw = new BufferedWriter(new FileWriter(new File(path)));            for(String line : cmcc) {                bw.write(line);                bw.flush();                Thread.sleep(1000);                bw.newLine();            }

        } catch (Exception e) {            e.printStackTrace();        }finally {            try {                if (bw != null) bw.close();            } catch (IOException e) {                e.printStackTrace();            }        }    }}
 

    代码写好,然后测试完,然后打成jar包,丢到Linux准备运行。

    java -jar /home/soft/jar/write_cmcc_5_seconds.jar /home/soft/cmcc.log /home/soft/cmcc/cmcc_write.log

  2.flume编写相应的conf去把数据抽取到kafka中(cmcc.conf)

    先启动zookeeper,启动kafka并创建topic(cmcc):

      zookeeper启动命令:

        /home/soft/zookeeper-3.4.6/bin/zkServer.sh start(每个节点都需要启动)
      kafka启动命令:
        /home/soft/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /home/soft/kafka_2.11-0.10.1.0/config/server.properties &
      kafka创建topic:
        bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --create --topic cmcc --partitions 6 --replication-factor

      kafka查看所有的topic:
        bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --list

    然后编写conf测试(cmcc.conf):

      

a1.sources = s1
a1.channels = c1

#这里先不使用该种方式去读取文件,因为该方式flume会出如下的错误
#java.lang.IllegalStateException: File has been modified since being read: /home/soft/cmcc/cmcc_write.log
#原因:出现这个问题的原因是,当我们拷贝一个文件的时候,一些对文件进行了修改
#解决:最好的方法就是,确保大文件完全拷贝后,再让flume来读取,思路是将拷贝中的文件加上一个多余的后缀,flume一开始不会读取文件,当文件拷贝完成后去掉多余的后缀,这个时候flume就会针对新文件进行读取。
#a1.sources.s1.type =spooldir
#a1.sources.s1.spoolDir =/home/soft/cmcc
#a1.sources.s1.fileHeader= true

a1.sources.s1.type=exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = os1:9092,os2:9092,os3:9092
#创建好相应的topic
a1.channels.c1.kafka.topic = cmcc
#这个是自己定义的没啥事情
a1.channels.c1.kafka.consumer.group.id = flume-consumer
#这个一定要设置,否则就是个坑,写入到kafka中的数据会被追加进一些数据,而且还是乱码的
a1.channels.c1.parseAsFlumeEvent = false

#拼接source和channel
a1.sources.s1.channels=c1

     flume启动命令:下面的a1就对应着上面的a1(控制台打印信息)
      bin/flume-ng agent -n a1 -c conf -f conf/cmcc.conf -Dflume.root.logger=INFO,console

  3.spark程序去读取kafka的中的数据并将结果存放至redis中

    启动redis:/usr/local/redis/bin/redis-server  /usr/local/redis/etc/redis.conf

    程序相应的配置:resources -> application.conf

#kafka的相关参数
kafka.topic = "cmcc"
kafka.broker.list="os1:9092,os2:9092,os3:9092"
kafka.group.id="cmcc"
redis.host="xxx.xxx.xxx.xxx"
redis.db.index="0"

    主程序代码:scala -> BootStarpApp

package app

import java.text.SimpleDateFormat

import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.{AppParams, Jpools}

object BootStarpApp {
  def main(args: Array[String]): Unit = {

    /**
      * 错误集:
      * 1.Caused by: org.apache.kafka.common.KafkaException: org.codehaus.jackson.map.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
      *   错误解释,kafka在进行序列化实例对象的时候出错
      *   查找原因:
      *   org.codehaus.jackson.map.deser.std.StringDeserializer是我们AppParas中导入的类型,可能是导错了,
      *   查看后发现应该导入:import org.apache.kafka.common.serialization.StringDeserializer
      * 2. 程序出现INFO:Marking the coordinator os3:9092 (id: 2147483645 rack: null) dead for group cmcc_test2,且程序不再执行下去
      *   原因:因为kafka-clent程序默认读取到kafka上的信息之后将host:os3返回作为主机节点去获取数据,但是在本机中没有配置相应的host与ip的映射,所有这里就无法直接进行访问os3
      *   解决办法;在windows中配置相应的ip与hostname的映射(kafka中的broker节点)
      * 3.json解析出错:error parse false
      *   原因json格式错误
      *
      * 4.flume的坑:a0.channels.c1.parseAsFlumeEvent = false  1.7以后默认为true
      * 如果设置此项为 true,Kafka Sink 则会把数据按照标准的 Flume Event 格式(即Headers域和body域结合的数据结构)发送。Flume Event 中的 Headers 域通常是一些附加字段,可以是时间戳(比如时间戳拦截器指定的时间戳)、文件名(比如 spooldir Source 开启的 fileHeader = true)等信息。但是 1.7.0 版本的 Flume 一旦开启此配置,会导致 Headers 域里面的信息乱码
      *
      * 5.flume异常崩溃 File has been modified since being read
      *   原因:出现这个问题的原因是,当我们拷贝一个文件的时候,一些对文件进行了修改,就会出现这个错误
      *   解决:最好的方法就是,确保大文件完全拷贝后,再让flume来读取,思路是将拷贝中的文件加上一个多余的后缀,flume一开始不会读取文件,当文件拷贝完成后去掉多余的后缀,这个时候flume就会针对新文件进行读取。
      *   另外针对大文件,flume的解决方案可以设置一个文件完成后缀:
      */

    val sparkConf = new SparkConf()

    sparkConf.setAppName("中国移动运营实时监控平台")
    sparkConf.setMaster("local[*]")

    /**
      *将rdd以kryo的序列化保存,以减少内存的使用
      */
    sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    /**
      * 对rdd进行压缩,使用内存空间换去处理时间的方式,减少内存的使用
     */
    sparkConf.set("spark.rdd.compress", "true")

    /**
      *
      */
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100")

    /**
      * 进行优雅的停止程序
      */
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    /**
      * 每两秒执行一个批次
      */
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    /**
      * 获取kafka的数据
      * LocationStrategies:位置策略,如果kafka的broker节点与Excutor在同一台机器上给一种策略,不再一台机器上给另一种策略
      * 设定策略之后会以最有的策略进行获取数据
      * 一般在企业中kafka节点与Excutor不会放到一台机器的,原因是kafka是消息存储的,Executor是用来做消息计算的
      * 因此计算与存储需要分开,存储对磁盘要求高,计算对内存和cpu的要求更高
      * 如果Executor节点跟Broker的节点在一起的话就使用PreferBrokers策略,不再一起的话就使用preferConsisent策略
      * 使用preferConsisent策略的话,将来在kafka中拉去数据以后尽量将数据分散到所有的Executor上
      */
    val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent
      , ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams))

    stream.foreachRDD(rdd => {

      /**
        * {
        * "bussinessRst": "0000",
        * "channelCode": "0705",
        * "chargefee": "10000",
        * "clientIp": "125.82.117.133",
        * "endReqTime": "20170412080609613",
        * "idType": "01",
        * "interFacRst": "0000",
        * "logOutTime": "20170412080609613",
        * "orderId": "384681890175026754",
        * "prodCnt": "1",
        * "provinceCode": "280",
        * "requestId": "20170412080450886738519397327610",
        * "retMsg": "成功",
        * "serverIp": "172.16.59.241",
        * "serverPort": "8088",
        * "serviceName": "sendRechargeReq",
        * "shouldfee": "9950",
        * "startReqTime": "20170412080609503",
        * "sysId": "15"
        * }
        */

      /**
        *  业务逻辑:
        *   serviceName:reChargeNotifyReq,则为充值通知的记录
        *   requestId:包含充值的日期(订单开始时间)
        *   bussinessRst:是否成功 0000 为成功,其他为不成功
        *   chargefee:充值的金额
        *   receiveNotifyTime:订单结束时间
        *
        */

      /**
        * 我们可以通过serviceName字段来确定,如果该字段是reChargeNotifyReq则代表该条数据是充值通知部分的数据。
        * 获取所有的充值通知日志
        */
      val baseData = rdd.map(cr => {
        print(cr.value())
        JSON.parseObject(cr.value())
      }).filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache()

      /**
        * 获取每天充值成功的订单笔数
        * 回忆:
        *   wordcount flatMap-》map-》reduceByKey
        */
      val totalSucc = baseData.map(obj=> {
        //获取日期
        val reqId = obj.getString("requestId")
        //获取日期
        val day = reqId.substring(0, 8)
        //取出该条充值是否成功的标志
        val result = obj.getString("bussinessRst")
        val flag = if(result.equals("0000")) 1 else 0
        (day, flag)
      }).reduceByKey(_+_)

      /**
        * 获取充值成功的订单金额
        */
      val totalMoney = baseData.map(obj=> {
        val reqId = obj.getString("requestId")
        //获取日期
        val day = reqId.substring(0, 8)
        //去除该条充值是否成功的标记
        val result = obj.getString("bussinessRst")
        val fee = if(result.equals("0000")) obj.getString("chargefee").toDouble else 0
        (day, fee)
      }).reduceByKey(_+_)

      //总订单数
      val total = baseData.count()

      /**
        * 获取充值成功的充值时长
        */
      val totalTime = baseData.map(obj=> {
        var reqId = obj.getString("requestId")
        //获取日期
        val day = reqId.substring(0, 8)

        //取出该条充值是否成功的标示
        val result = obj.getString("bussinessRst")
        //时间格式为:yyyyMMddHHmissSSS
        val endTime = obj.getString("receiveNotifyTime")
        val startTime = reqId.substring(0, 17)

        val format = new SimpleDateFormat("yyyyMMddHHmissSSS")

        val cost = if(result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0
        (day, cost)
      }).reduceByKey(_+_)

      /**
        * 将数据存储到redis中:
        * (CMCC-20170412,35)
        */
      totalSucc.foreachPartition(itr=> {
       val jedis = Jpools.getJedis
        itr.foreach(tp => {
         // print("CMCC-"+tp._1, tp._2)
          jedis.incrBy("CMCC-"+tp._1, tp._2)
        })
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

  两个工具类:

package utils

import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer

object AppParams {
  /**Scala中使用关键字lazy来定义惰性变量,实现延迟加载(懒加载)。
  惰性变量只能是不可变变量,并且只有在调用惰性变量时,才会去实例化这个变量。
    load中可以指定相应的配置文件,但是不指定的情况下默认去读取resources下的application.conf文件
      默认规则:application.conf->application.json->application.properties
    **/
  private lazy val config = ConfigFactory.load()

  val redisHost = config.getString("redis.host")
  val selectDBIndex = config.getInt("redis.db.index")
  /**
    * 返回订阅的主题
    */
  val topic = config.getString("kafka.topic").split(",")

  /**
    * kafka集群所在的主机和端口
    */
  val brokers:String = config.getString("kafka.broker.list")

  /**
    * 消费者的id
    */
  val groupId = config.getString("kafka.group.id")

  /**
    * 将kafka的相关参数进行分装到map中
    */
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer"-> classOf[StringDeserializer],
    "group.id"-> groupId,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> "false"
  )
}
package utils
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

/**
  * 创建一个redis的线程池
  */
object Jpools {
  private val poolConfig = new GenericObjectPoolConfig
  poolConfig.setMaxIdle(5) //最大的空闲连接数为5,连接池中最大的空闲连接数,默认是8
  poolConfig.setMaxTotal(2000) //最大支持的连接数量,默认也是8

  //连接池是私有的,不能对外进行公开访问
  private lazy val  jedisPool = new JedisPool(poolConfig, AppParams.redisHost)

  def getJedis = {
    val jedis = jedisPool.getResource
    jedis.select(AppParams.selectDBIndex)
    jedis
  }
}

  pom文件

<dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <!-- 导入kafka的依赖-->
       <!-- <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
        </dependency>-->
        <!-- 指定kafka-client API的版本-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
        </dependency>
        <!-- 导入spark streaming 与kafka的依赖包-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

    </dependencies>

问题总结:

  1.json格式的转换 (已解决)

  2.flume读取数据到kafka后数据乱码增多问题(已解决)

  3.flume  spooldir  读取文件的同时对文件更改造成的java.lang.IllegalStateException:File has been modified since being read:问题 (待解决)

  4.上述spark主程序代码优化问题 (待解决)

原文地址:https://www.cnblogs.com/zyc-2019/p/10596260.html

时间: 2024-10-09 20:51:29

中国移动实时数据分析-基于spark+kafka+flume的相关文章

&lt;颠覆大数据分析 基于StormSpark等Hadoop替代技术的实时应用&gt;

为什么要超越Hadoop MapReduce Hadoop的适用范围 Hadoop缺乏对象数据库连接(ODBC) Hadoop不适合所有类型的应用程序 hadoop不适合分片数据 Hadoop不适合迭代式计算 海量数据分析所需的计算范式分类(7大任务) 基础分析 线性代数计算 广义的多体问题 图论问题 优化 积分 比对问题 Hadoop非常适合第一类基础分析,对于其他问题,较简单或者小型的任务都是Hadoop可解的. 于是有了Spark,spark可以看做是大数据领域下一个数据处理的Hadoop

大数据Storm开发实时数据分析平台视频教程

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

走在大数据的边缘 基于Spark的机器学习-智能客户系统项目实战(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP

京东基于Spark的风控系统架构实践和技术细节

京东基于Spark的风控系统架构实践和技术细节 时间 2016-06-02 09:36:32  炼数成金 原文  http://www.dataguru.cn/article-9419-1.html 主题 Spark软件架构 1.背景 互联网的迅速发展,为电子商务兴起提供了肥沃的土壤.2014年,中国电子商务市场交易规模达到13.4万亿元,同比增长31.4%.其中,B2B电子商务市场交易额达到10万亿元,同比增长21.9%.这一连串高速增长的数字背后,不法分子对互联网资产的觊觎,针对电商行业的恶

基于spark排序的一种更廉价的实现方案-附基于spark的性能测试

排序可以说是很多日志系统的硬指标(如按照时间逆序排序),如果一个大数据系统不能进行排序,基本上是这个系统属于不可用状态,排序算得上是大数据系统的一个"刚需",无论大数据采用的是hadoop,还是spark,还是impala,hive,总之排序是必不可少的,排序的性能测试也是必不可少的. 有着计算奥运会之称的Sort Benchmark全球排序每年都会举行一次,每年巨头都会在排序上进行巨大的投入,可见排序速度的高低有多么重要!但是对于大多数企业来说,动辄上亿的硬件投入,实在划不来.甚至远

大数据实时处理-基于Spark的大数据实时处理及应用技术培训

随着互联网.移动互联网和物联网的发展,我们已经切实地迎来了一个大数据 的时代.大数据是指无法在一定时间内用常规软件工具对其内容进行抓取.管理和处理的数据集合,对大数据的分析已经成为一个非常重要且紧迫的需求.目前对大数据的分析工具,首选的是Hadoop/Yarn平台,但目前对大数据的实时分析工具,业界公认最佳为Spark.Spark是基于内存计算的大数据并行计算框架,Spark目前是Apache软件基金会旗下,顶级的开源项目,Spark提出的DAG作为MapReduce的替代方案,兼容HDFS.H

Druid 实时数据分析存储系统

简介 Druid 是一个开源的,分布式的,列存储的,适用于实时数据分析的存储系统,能够快速聚合.灵活过滤.毫秒级查询.和低延迟数据导入. Druid在设计时充分考虑到了高可用性,各种节点挂掉都不会使得druid停止工作(但是状态会无法更新): Druid中的各个组成部分之间耦合性低,如果不需要实时数据完全可以忽略实时节点: Druid使用Bitmap indexing加速列存储的查询速度,并使用CONCISE算法来对bitmap indexing进行压缩,使得生成的segments比原始文本文件