大数据-spark理论(3)sparkSql,sparkStreaming,spark调优

导读目录

第一节:sparksql

  1:简介

  2:核心

  3:与hive整合

  4:dataFrame

  5:函数

第二节:spark Streaming

  1:对比strom

  2:DStream的算子

  3:代码

  4:driver HA

  5:读取数据

第三节:spark调优

第一节:sparksql

  (1)简介:

    Shark:shark是sparksql的前身,hive是shark的前身

    快的原因:不仅是内存,还有谓词下移(减少一定量的数据IO)

    

                正常                         谓词下移

             (先关联表在切割)                  (先将表中的字段过滤,再join)

  (2)核心:

    sql的解析优化,执行引擎全是spark;

    兼容hive的所有sql;

    可以直接访问RDD,spark的核心就是RDD;

    Dataframe:对RDD进行包装,自己的存储数据集合;

  

  (3)与hive整合:

    3.1 整合的方式

    第一种:hive on spark(实际就是shark):

      存储,sql解析优化hive实现

      执行引擎是spark

    第二种:spark on hive:

      存储是hive

      sql解析优化,执行引擎都是spark

      应用:

        1、安装配置

          拷贝hive-site.xml文件到conf目录,

            只保留thrift://node3:9083

          在启动application的时候能看到连接9083端口的信息

          创建HiveContext对象

        2、执行引擎

          数据存储在hive中

          解析优化执行全部是spark来执行

    3.2 代码(从hive中读数据,往hive中写数据)

SparkConf conf = new SparkConf();
conf.setAppName("hive");
JavaSparkContext sc = new JavaSparkContext(conf);
//HiveContext是SQLContext的子类。
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("USE spark");
hiveContext.sql("DROP TABLE IF EXISTS student_infos");
//在hive中创建student_infos表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos (name STRING,age INT) row format delimited fields terminated by ‘\t‘ ");
hiveContext.sql("load data local inpath ‘/root/test/student_infos‘ into table student_infos");
hiveContext.sql("DROP TABLE IF EXISTS student_scores");
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores (name STRING, score INT) row format delimited fields terminated by ‘\t‘");
hiveContext.sql("LOAD DATA LOCAL INPATH ‘/root/test/student_scores‘ INTO TABLE student_scores");
/**
 * 查询表生成DataFrame
 */
DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score FROM student_infos si JOIN student_scores ss ON si.name=ss.name WHERE ss.score>=80");

hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
goodStudentsDF.registerTempTable("goodstudent");
DataFrame result = hiveContext.sql("select * from goodstudent");
result.show();

/**
 * 将结果保存到hive表 good_student_infos
 */
goodStudentsDF.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");

Row[] goodStudentRows = hiveContext.table("good_student_infos").collect();
for(Row goodStudentRow : goodStudentRows) {
    System.out.println(goodStudentRow);
}
sc.stop();

    3.3 提交到集群的指令

        ./spark-submit

        --master spark://node1:7077,node2:7077

        --executor-cores 1

        --executor-memory 2G

        --total-executor-cores 1

        --class com.bjsxt.sparksql.dataframe.CreateDFFromHive

        /root/test/HiveTest.jar

  (4)dataFrame:

    4.1、拥有独立的api,所以还是代码形式的,所以不是很好用,还是以sql形式的好

      df.show()只能显示20行,可以添加参数

      dataframe可以转换成RDD

        .javaRDD

        .rdd

        //以上生成的是list形式,可以通过以下获取具体列

        .get(0)

        .getAs(“name”)

      直接执行sql

        df.registerTempTable(“表名”) //将dataframe数据注册成临时表,列名时按照ascii码排列的

        SqlContext.sql(“查询上面表名中的数据即可”)

    4.2、创建dataframe方式

      (1)读取json格式的数据

        sqlContext.read().format("json").load(path)

        sqlContext.read().json(path)

        注意点:

          1、json数据不能嵌套

      (2)读取json格式的RDD

        sqlContext.read().json(rdd)

      (3)通过反射的方式创建Dataframe,将rdd封装到对象中

        1.定义具体的对象类

        2.map算子进行源文件切割,包装成对象(这个对象必须序列化)

          rdd.map() //将rdd切割之后对应的封装到对象中

        3.映射创建

          sqlContext.creatDataframe(rdd, Persion.class)

      (4)通过struct方式创建Dataframe

        1.在切割源文件的时候,使用rowFactory.create()

          rdd.map()的返回值是rowFactory.create(),得到Row类型的RDD

        2.规定structType,使用DataTypes来创建

          List<StructField> asList =Arrays.asList(

            DataTypes.createStructField("id", DataTypes.StringType, true),

            DataTypes.createStructField("name", DataTypes.StringType, true),

            DataTypes.createStructField("age", DataTypes.IntegerType, true)

          );

          StructType schema = DataTypes.createStructType(asList);

          DataFrame df = sqlContext.createDataFrame(rowRDD, schema);

      (5)读取parquet文件(是一个列式存储)创建Dataframe

          sqlContext.read().format("parquet").load(path)

          sqlContext.read().parquet(path);

      (6)读取mysql数据创建Dataframe

          连接mysql的时候使用jdbc的方式

          设置参数(driver,url,user,password,Dbtable)

          1.sqlContext.read().options(map).format("jdbc").load()

            例子:

Map<String, String> options = new HashMap<String,String>();
options.put("url", "jdbc:mysql://192.168.179.4:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "123456");
options.put("dbtable", "person");
DataFrame person = sqlContext.read().format("jdbc").options(options).load();

          2.sqlContext.read().format("jdbc").load()

            例子:

DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://192.168.179.4:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "123456");
reader.option("dbtable", "score");
DataFrame score = reader.load();

    4.3、本地执行hive

      1、拷贝当前配置文件到src目录:hive-site.xml,core-site.xml,hdfs-site.xml

      2、添加jar,以data开头的三个jar文件

      3、window环境必须是以root用户名命名的

      4、执行的时候内存有可能不够,添加VM参数配置:

        -server -Xms512M -Xmx1024M -XX:PermSize=256M -XX:MaxNewSize=512M -XX:MaxPermSize=512M

    4.4、写数据

      (1)写入数据源

        parquet

        df.write().mode(SaveMode).format("parquet").save(“路径”)

        例子:

          df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");

          df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");

        hive

        df.write().mode(SaveMode).saveAsTable()

        mysql

        df.write().mode(SaveMode).format("JDBC").save()

      (2)写数据操作

        df.write().mode(SaveMode).save()

        SaveMode

          append:追加

          overwrite:覆盖

          ignore:如果存在就忽略

          ErrorIfExists:存在即报错

  (5)函数

    5.1 udf

        sqlContext.udf().register(方法名称,new UDF1..22,返回值类型),即最多22个参数

        例子:

val conf = new SparkConf()
conf.setMaster("local").setAppName("udf")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);
val rdd = sc.makeRDD(Array("zhansan","lisi","wangwu"))
val rowRDD = rdd.map { x => {
    RowFactory.create(x)
} }

val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))

val df = sqlContext.createDataFrame(rowRDD, schema)
df.registerTempTable("user")

//sqlContext.udf.register("StrLen",(s : String)=>{s.length()})
//sqlContext.sql("select name ,StrLen(name) as length from user").show
sqlContext.udf.register("StrLen",(s : String,i:Int)=>{s.length()+i}) //定义的函数
sqlContext.sql("select name ,StrLen(name,10) as length from user").show //引用

sc.stop()

    5.2 Udaf(聚合):实现UDAF函数如果要自定义类要继承UserDefinedAggregateFunction类

        sqlContext.udf().register(函数名称,new UserDefinedAggratedFunction())

        例子:

class MyUDAF extends UserDefinedAggregateFunction  {
  // 定义缓存区参数的类型
  def bufferSchema: StructType = {
    DataTypes.createStructType(Array(DataTypes.createStructField("aaa", IntegerType, true)))
  }
  // 最终函数返回值的类型
  def dataType: DataType = {
    DataTypes.IntegerType
  }
  def deterministic: Boolean = {
    true
  }
  // 最后返回一个最终的聚合值,要和dataType的类型一一对应
  def evaluate(buffer: Row): Any = {
   buffer.getAs[Int](0)
  }
  // 为每个分组的数据执行初始化值,重点
  def initialize(buffer: MutableAggregationBuffer): Unit = {
   buffer(0) = 0
  }
  //输入数据的类型
  def inputSchema: StructType = {
    DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))
  }
  // 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并,重点(合并所有节点)
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
  }
  // 每个组,有新的值进来的时候,进行分组对应的聚合值的计算,重点(每个组上的相同key的做操作)
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Int](0)+1
  }
}

object UDAF {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("udaf")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd = sc.makeRDD(Array("zhangsan","lisi","wangwu","zhangsan","lisi"))
    val rowRDD = rdd.map { x => {RowFactory.create(x)} }
    val schema = DataTypes.createStructType(Array(DataTypes.createStructField("name", StringType, true)))
    val df = sqlContext.createDataFrame(rowRDD, schema)
    df.show()
    df.registerTempTable("user")

    /**
     * 注册一个udaf函数
     */
    sqlContext.udf.register("StringCount", new MyUDAF())
    sqlContext.sql("select name ,StringCount(name) from user group by name").show()

    sc.stop()
  }}

    5.3 开窗函数: over(专门解决某些特定场景的问题)

        例子:分组取topn

          用到的开窗函数:row_number():其中的一个开窗函数,还有很多其他的开窗函数

          用法:row_number() over(partition by xxx order by xxx desc as rank)

          代码:

SparkConf conf = new SparkConf();
conf.setAppName("windowfun");
JavaSparkContext sc = new JavaSparkContext(conf);
HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("use spark");
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) row format delimited fields terminated by ‘\t‘");
hiveContext.sql("load data local inpath ‘/root/test/sales‘ into table sales");

 /**
  * 开窗函数格式:
  * 【 rou_number() over (partitin by XXX order by XXX) 】
  */
DataFrame result = hiveContext.sql("select riqi,leibie,jine from (select riqi,leibie,jine,"
                    + "row_number() over (partition by leibie order by jine desc) rank from sales) t where t.rank<=3");
result.show();

sc.stop();

第二节:spark Streaming

  1、对比strom

    

      不建议使用动态资源,因为你释放资源之后,如果再用的话被占用,那么就影响了流式的速度。

  2、DStream的算子

    (1)Transformation算子

      1、updateStateByKey:只要启动之后就开始统计所有key的状态。

        需要开启checkpoint:

          sparkContext.setCheckpoint(“定义存状态的路径”) 或者StreamingContext.checkpoint(“定义存状态的路径”)

        这个状态在内存是存在的,那么多久的时间写入磁盘呢?

          如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。如果batchInterval设置的时间大于10秒,那么就会 batchInterval时间间隔写入磁盘一份

        举例:new StreamingContext(conf, Durations.seconds(5)) //每5秒钟记录一次

      上面是记录的所有的记录,那么如何记录一段时间内的记录,用窗口函数:

      2、窗口函数举例:reduceByKeyAndWindow(一个function,Durations.seconds(15),Durations.seconds(5))

               //每隔5秒(滑动间隔)记录前15秒(窗口长度)的状态

               //未优化的普通机制不需要设置checkpoint

        

        按照这个图上的分析:

          我们设置的是每隔5秒计算一次,那么一个绿框就是5秒的数据

          优化的机制:(假如每隔1秒计算过去一年的,那么可能会产生任务堆积)

            我们可以在计算的逻辑上,用当前的加上新的状态,减去不要的状态,这个时候需要设置checkpoint

        .windows(Durations.seconds(15),Durations.seconds(5)) //自己定义窗口函数

      3、transform:

        是在driver端执行的,可以动态广播变量。

        可以对Dstream中的RDD做RDD与RDD之间的任意操作,不需要action算子触发。

    (2)outPutOperator算子

      foreachRDD:

        这个是streaming的outPutOperator算子,所以执行就触发。(所以可以动态的发布广播变量)

        如果在这里面用了Transformation算子,那么不用action算子触发的话,这个里面的Transformation算子不会执行。

  3、代码

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountOnline");

/**
 * 在创建streaminContext的时候 设置batchInterval
 */
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
JavaReceiverInputDStream<String> lines = jsc.socketTextStream("node5", 9999);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  private static final long serialVersionUID = 1L;
  @Override
  public Iterable<String> call(String s) {
    return Arrays.asList(s.split(" "));
  }
});

JavaPairDStream<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
  private static final long serialVersionUID = 1L;

  @Override
  public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);

  }
});

JavaPairDStream<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
  private static final long serialVersionUID = 1L;

  @Override
  public Integer call(Integer i1, Integer i2) {
    return i1 + i2;
  }
});
//outputoperator类的算子
counts.print();
jsc.start();

//等待spark程序被终止
jsc.awaitTermination();jsc.stop(false);

  4、driver HA

    (1) 提交任务层面,在提交任务的时候加上选项 --supervise,当Driver挂掉的时候会自动重启Driver

    (2) 代码层面,使用JavaStreamingContext.getOrCreate(checkpoint路径,JavaStreamingContextFactory)

      Driver中元数据包括:

    1. 创建应用程序的配置信息。
    2. DStream的操作逻辑。
    3. job中没有完成的批次数据,也就是job的执行进度。

  

  5、读取数据

    (1) 监控文件的数据

    (2) Kafka(几乎都是结合kafka使用)

      整合1:receiver模式,已被淘汰

      

      整合2:direct模式

      

    (3) sparkStreaming+kafka版本变化:

      Direct模式,1.6用的是simple API,2.0用的是new API,所以代码有变化。

      可以用kafka管理offset,但是是异步提交方式。

第三节:spark 调优

  1、资源调优

    集群:

      SPARK_WORKER_MEMORY

      SPARK_WORKER_CORES

    提交任务:

      ./spark-submit......

      --driver-cores

      --driver-memory

      --executor-cores

      --excutor-memory

      --totail-executor-cores

      最好在提交任务时指定

  2、并行度调优

    即提升partition个数

    生成RDD或者一些算子指定partition个数。

    Reparation/coalesce

    Spark.default.parallelism

    Spark.sql.shuffle.partitions

    自定义分区器

  3、代码调优

    (1)不要频繁创建RDD,复用同一个RDD

    (2)对RDD的持久化

    (3)尽量避免使用shuffle类算子

    (4)尽量使用高性能的算子

      使用reduceByKey替代groupByKey

      使用mapPartition替代map

      使用foreachPartition替代foreach

      filter后使用coalesce减少分区数

      使用使用repartitionAndSortWithinPartitions替代repartition与sort类操作

      使用repartition和coalesce算子操作分区。

    (5)使用map-side预聚合的shuffle操作

      即尽量使用有combiner的shuffle类算子。

      combiner概念:

        在map端,每一个map task计算完毕后进行的局部聚合。

      combiner好处:

        1) 降低shuffle write写磁盘的数据量。

        2) 降低shuffle read拉取数据量的大小。

        3) 降低reduce端聚合的次数。

      有combiner的shuffle类算子:

        1) reduceByKey:这个算子在map端是有combiner的,在一些场景中可以使用reduceByKey代替groupByKey。

        2) aggregateByKey

        3) combineByKey

    (6) 使用广播变量

  

  4、数据本地化调优

    级别:

      1) PROCESS_LOCAL

      2) NODE_LOCAL

      3) NO_PREF

      4) RACK_LOCAL

      5) ANY

原文地址:https://www.cnblogs.com/dblog/p/12172859.html

时间: 2024-10-14 21:44:44

大数据-spark理论(3)sparkSql,sparkStreaming,spark调优的相关文章

“大数据讲师”、“Hadoop讲师”、“Spark讲师”、“云计算讲师”、“Android讲师”

王家林简介 Spark亚太研究院院长和首席专家,中国目前唯一的移动互联网和云计算大数据集大成者. 在Spark.Hadoop.Android等方面有丰富的源码.实务和性能优化经验.彻底研究了Spark从0.5.0到0.9.1共13个版本的Spark源码,并已完成2014年5月31日发布的Spark1.0源码研究. Hadoop源码级专家,曾负责某知名公司的类Hadoop框架开发工作,专注于Hadoop一站式解决方案的提供,同时也是云计算分布式大数据处理的最早实践者之一: Android架构师.高

大数据开发:剖析Hadoop和Spark的Shuffle过程差异

一.前言 对于基于MapReduce编程范式的分布式计算来说,本质上而言,就是在计算数据的交.并.差.聚合.排序等过程.而分布式计算分而治之的思想,让每个节点只计算部分数据,也就是只处理一个分片,那么要想求得某个key对应的全量数据,那就必须把相同key的数据汇集到同一个Reduce任务节点来处理,那么Mapreduce范式定义了一个叫做Shuffle的过程来实现这个效果. 二.编写本文的目的 本文旨在剖析Hadoop和Spark的Shuffle过程,并对比两者Shuffle的差异. 三.Had

大并发高负载下的PHP-FPM参数调优

大并发高负载下的PHP-FPM参数调优 主要针对PHP在Linux下的参数调优 调整文件描述符限制 # ulimit -n 1000000 # vi /etc/security/limits.conf # Setting Shell Limits for File Descriptors *  soft nofile 1000000 *  hard nofile 1000000 禁止PHP代码文件所在分区的文件系统访问时间更新 # vi /etc/fstab 比如PHP代码所在分区: /dev/

看懂大数据的技术生态圈 Hadoop,hive,spark(转载)

先给出原文链接: 原文链接 大数据本身是个很宽泛的概念,Hadoop生态圈(或者泛生态圈)基本上都是为了处理超过单机尺度的数据处理而诞生的.你可以把它比作一个厨房所以需要的各种工具.锅碗瓢盆,各有各的用处,互相之间又有重合.你可以用汤锅直接当碗吃饭喝汤,你可以用小刀或者刨子去皮.但是每个工具有自己的特性,虽然奇怪的组合也能工作,但是未必是最佳选择. 大数据,首先你要能存的下大数据. 传统的文件系统是单机的,不能横跨不同的机器.HDFS(Hadoop Distributed File System

了解大数据的技术生态系统 Hadoop,hive,spark(转载)

首先给出原文链接: 原文链接 大数据本身是一个很宽泛的概念,Hadoop生态圈(或者泛生态圈)基本上都是为了处理超过单机尺度的数据处理而诞生的.你能够把它比作一个厨房所以须要的各种工具. 锅碗瓢盆,各有各的用处.互相之间又有重合.你能够用汤锅直接当碗吃饭喝汤,你能够用小刀或者刨子去皮. 可是每一个工具有自己的特性,尽管奇怪的组合也能工作,可是未必是最佳选择. 大数据,首先你要能存的下大数据. 传统的文件系统是单机的,不能横跨不同的机器. HDFS(Hadoop Distributed File

北京上海广州Cloudera Hadoop大数据:CCAH(管理员)、CCA(Spark and Hadoop)、HBase

上海5月21-24日ClouderaAaminisrrator Training for Apache Hadoop(CCAH) 广州6月1-3日Cloudera Trainingfor Apache Hbase 广州6月18-21日Cloudera Developertraining for Spark and Hadoop(CCA-175) 上海6月27-30日Cloudera Developertraining for Spark and Hadoop(CCA-175) 北京7月7-10日

大数据之二:Hadoop与Spark辨析

转载自知乎:https://www.zhihu.com/question/26568496 1) MapReduce:是一种离线计算框架,将一个算法抽象成Map和Reduce两个阶段进行 处理,非常适合数据密集型计算. 2) Spark:MapReduce计算框架不适合迭代计算和交互式计算,MapReduce是一种磁盘 计算框架,而Spark则是一种内存计算框架,它将数据尽可能放到内存中以提高迭代 应用和交互式应用的计算效率. 3) Storm:MapReduce也不适合进行流式计算.实时分析,

大数据项目实践:基于hadoop+spark+mongodb+mysql开发医院临床知识库系统

一.前言 从20世纪90年代数字化医院概念提出到至今的20多年时间,数字化医院(Digital Hospital)在国内各大医院飞速的普及推广发展,并取得骄人成绩.不但有数字化医院管理信息系统(HIS).影像存档和通信系统(PACS).电子病历系统(EMR)和区域医疗卫生服务(GMIS)等成功实施与普及推广,而且随着日新月异的计算机技术和网络技术的革新,进一步为数字化医院带来新的交互渠道譬如:远程医疗服务,网上挂号预约. 随着IT技术的飞速发展,80%以上的三级医院都相继建立了自己的医院信息系统

大数据学习:Scala面向对象和Spark一些代码读和问

画外音: Spark对面向对象的支持是非常完美的 主题: 1.简单的类: 2.重写getter.setter方法: 3.利用其它方法来控制外部对值的控制: 4. private[this]: 5.构造器以及构造器相关: 直接代码见真章: ==========最简单的类============ scala> class HiScala{ | private var name = "Spark" | def sayName(){println(name)} | def getName