RDD之三:RDD创建方式

RDD创建方式

1)从Hadoop文件系统(如HDFS、Hive、HBase)输入创建。
2)从父RDD转换得到新RDD。
3)通过parallelize或makeRDD将单机数据创建为分布式RDD。

4)基于DB(Mysql)、NoSQL(HBase)、S3(SC3)、数据流创建。

从集合创建RDD

  • parallelize

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

从一个Seq集合创建RDD。

参数1:Seq集合,必须。

参数2:分区数,默认为该Application分配到的资源的CPU核数

[java] view plain copy

  1. scala> var rdd = sc.parallelize(1 to 10)
  2. rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :21
  3. scala> rdd.collect
  4. res3: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  5. scala> rdd.partitions.size
  6. res4: Int = 15
  7. //设置RDD为3个分区
  8. scala> var rdd2 = sc.parallelize(1 to 10,3)
  9. rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :21
  10. scala> rdd2.collect
  11. res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  12. scala> rdd2.partitions.size
  13. res6: Int = 3
  • makeRDD

def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

这种用法和parallelize完全相同

def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]

该用法可以指定每一个分区的preferredLocations。

[java] view plain copy

  1. scala> var collect = Seq((1 to 10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")),
  2. (11 to 15,Seq("slave013.lxw1234.com","slave015.lxw1234.com")))
  3. collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
  4. List(slave007.lxw1234.com, slave002.lxw1234.com)), (Range(11, 12, 13, 14, 15),List(slave013.lxw1234.com, slave015.lxw1234.com)))
  5. scala> var rdd = sc.makeRDD(collect)
  6. rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at makeRDD at :23
  7. scala> rdd.partitions.size
  8. res33: Int = 2
  9. scala> rdd.preferredLocations(rdd.partitions(0))
  10. res34: Seq[String] = List(slave007.lxw1234.com, slave002.lxw1234.com)
  11. scala> rdd.preferredLocations(rdd.partitions(1))
  12. res35: Seq[String] = List(slave013.lxw1234.com, slave015.lxw1234.com)

指定分区的优先位置,对后续的调度优化有帮助。

从外部存储创建RDD

  • textFile

//从hdfs文件创建.

[java] view plain copy

  1. //从hdfs文件创建
  2. scala> var rdd = sc.textFile("hdfs:///tmp/lxw1234/1.txt")
  3. rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at textFile at :21
  4. scala> rdd.count
  5. res48: Long = 4
  6. //从本地文件创建
  7. scala> var rdd = sc.textFile("file:///etc/hadoop/conf/core-site.xml")
  8. rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[28] at textFile at :21
  9. scala> rdd.count
  10. res49: Long = 97

注意这里的本地文件路径需要在Driver和Executor端存在。

  • 从其他HDFS文件格式创建

hadoopFile

sequenceFile

objectFile

newAPIHadoopFile

  • 从Hadoop接口API创建

hadoopRDD

newAPIHadoopRDD

比如:从Hbase创建RDD

[java] view plain copy

    1. scala> import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
    2. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
    3. scala> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    4. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    5. scala> import org.apache.hadoop.hbase.client.HBaseAdmin
    6. import org.apache.hadoop.hbase.client.HBaseAdmin
    7. scala> val conf = HBaseConfiguration.create()
    8. scala> conf.set(TableInputFormat.INPUT_TABLE,"lxw1234")
    9. scala> var hbaseRDD = sc.newAPIHadoopRDD(
    10. conf,classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])
    11. scala> hbaseRDD.count
    12. res52: Long = 1
时间: 2024-10-11 15:58:30

RDD之三:RDD创建方式的相关文章

Java接入Spark之创建RDD的两种方式和操作RDD

首先看看思维导图,我的spark是1.6.1版本,jdk是1.7版本 spark是什么? Spark是基于内存计算的大数据并行计算框架.Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark 部署在大量廉价硬件之上,形成集群. 下载和安装 可以看我之前发表的博客 Spark安装 安装成功后运行示例程序 在spark安装目录下examples/src/main目录中. 运行的一个Java或Scala示例程序,使用bin/run-examp

大数据基础教程:创建RDD的二种方式

大数据基础教程:创建RDD的二种方式 1.从集合中创建RDD val conf = new SparkConf().setAppName("Test").setMaster("local")      val sc = new SparkContext(conf)      //这两个方法都有第二参数是一个默认值2  分片数量(partition的数量)      //scala集合通过makeRDD创建RDD,底层实现也是parallelize      val 

大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Core 实例练习

第1章 RDD 概念1.1 RDD 为什么会产生1.2 RDD 概述1.2.1 什么是 RDD1.2.2 RDD 的属性1.3 RDD 弹性1.4 RDD 特点1.4.1 分区1.4.2 只读1.4.3 依赖1.4.4 缓存1.4.5 CheckPoint第2章 RDD 编程2.1 RDD 编程模型2.2 RDD 创建2.2.1 由一个已经存在的 Scala 集合创建,即集合并行化(测试用)2.2.2 由外部存储系统的数据集创建(开发用)2.3 RDD 编程2.3.1 Transformatio

Apache Spark RDD之RDD的转换

RDD的转换 Spark会根据用户提交的计算逻辑中的RDD的转换和动作来生成RDD之间的依赖关系,同时这个计算链也就生成了逻辑上的DAG.接下来以“Word Count”为例,详细描述这个DAG生成的实现过程. Spark Scala版本的Word Count程序如下: 1: val file = spark.textFile("hdfs://...") 2: val counts = file.flatMap(line => line.split(" "))

String变量的两种创建方式

在java中,有两种创建String类型变量的方式: String str01="abc";//第一种方式 String str02=new String("abc")://第二种方式 第一种方式创建String变量时,首先查找JVM方法区的字符串常量池是否存在存放"abc"的地址,如果存在,则将该变量指向这个地址,不存在,则在方法区创建一个存放字面值"abc"的地址. 第二种方式创建String变量时,在堆中创建一个存放&q

控制器的创建方式 -- 及其导航控制器的管理

一 控制器的创建方式 1.storyboard创建 1 self.window = [[UIWindow alloc] initWithFrame:[UIScreen mainScreen].bounds]; 2 3 self.window.backgroundColor = [UIColor blueColor]; 4 5 UIStoryboard *story = [UIStoryboard storyboardWithName:@"Main" bundle:nil]; 6 7 /

视图控制器的View创建方式

UIViewContrller 有三种创建方式: 1.通过alloc init直接创建. 2.通过故事版创建. 3.通过xib文件描述. 这是appDelegate.m的内容 //window的颜色是绿色 self.window = [[UIWindow alloc]initWithFrame:[UIScreen mainScreen].bounds]; self.window.backgroundColor = [UIColor greenColor]; /** 第一种:直接alloc ini

iOS控制器的创建方式

iOS控制器的创建.除了常见的alloc init外还有通过加载storyboard和xib的方式,下边逐一展开: 1.代码alloc init 创建方式 ViewController *vc= [[ViewController alloc] init]; 2.storyboard创建控制器 1>加载制定的storyboard文件 UIStoryboard *board =     [UIStoryboard storyboardWithName:@"viewCon" bundl

JAVA学习第二十三课(多线程(二))- (多线程的创建方式二 :实现Runnable接口(常用))

当一个类有父亲,但是其中的功能还希望实现线程,那么就不能采用继承Thread的方式创建线程 那么就可以通过接口的方式完成 准备扩展Demo类的功能,让其中的内容可以作为线程的任务执行 实现Runnable接口,Runnable接口中只有一个方法run 一.创建线程的第二种方法 Runnable的出现仅仅是将线程的任务进行了对象的封装 /* * 创建线程的第二种方法 * 1.定义类实现Runnable接口 * 2.覆盖接口中的fun方法,将线程的任务代码封装到run方法中 * 3.通过Thread