创建spark_读取数据

在2.0版本之前,使用Spark必须先创建SparkConf和SparkContext,不过在Spark2.0中只要创建一个SparkSession就够了,SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中。

在与spark2.0交互之前必须先创建spark对象

val Spark = SparkSession
  .builder()
  .master(masterUrl)
  .appName(appName)
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

当创建好了SparkSession,我们就可以配置Spark运行相关属性。比如下面代码片段我们修改了已经存在的运行配置选项。

spark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.executor.memory", "2g")

绝大多数的属性控制应用程序的内部设置,并且默认值都是比较合理的。下面对这些属性进行说明:

spark.app.name

  该属性没有默认值,它的含义是你的应用程序的名字,这个名字设定之后将会在WEB UI上和日志数据里面显示。如果这个属性没有设置的话,将会把你应用程序的main函数所在类的全名作为应用程序的名称。在Yarn环境下,还可以用--name或者SPARK_YARN_APP_NAME来设置应用程序的名称。为了能够方便地查看各个应用程序的含义,取一个好的名字是很重要的。

spark.master

  该属性没有默认值。这是Spark程序需要连接的集群管理器所在的URL地址。当前的spark支持三种集群方式Standalone、Apache Mesos以及YARN模式。如果这个属性在提交应用程序的时候没设置,程序将会通过System.getenv("MASTER")来获取MASTER环境变量;但是如果MASTER环境变量没有设定,那么程序将会把master的值设定为local[*],之后程序将在本地启动。

spark.executor.memory

  该属性的默认值是512m。每个executor处理器可以使用的内存大小之和,跟JVM的内存表示的字符串格式是一样的(比如: ‘512m‘,‘2g‘)。在早期版本的Spark,是通过-Xmx和-Xms来设置的。如果这个值没有设定,那么程序将会先获取SPARK_EXECUTOR_MEMORY环境变量;如果还没设置,那么获取SPARK_MEM环境变量的值;如果这个值也没设定,那么这个值将会别设定为512,。
  几乎所有运行时性能相关的内容都或多或少间接和内存大小相关。这个参数最终会被设置到Executor的JVM的heap尺寸上,对应的就是Xmx和Xms的值。

spark.serializer

  默认值是org.apache.spark.serializer.JavaSerializer。用于序列化网络传输或者以序列化形式缓存起来的各种对象的类。默认的Serializer可以对所有的Java对象进行序列化,但是它的速度十分慢!所以如果速度是影响程序运行的关键,你可以将该值设定为org.apache.spark.serializer.KryoSerializer。在一些情况下,KryoSerializer的性能可以达到JavaSerializer的10倍以上,但是相对于JavaSerializer而言,主要的问题是它不能支持所有的Java对象。当然,用户可以直接继承org.apache.spark.serializer来实现自己的Serializer。

spark.kryo.registrator

  默认值为空。如果你使用了KryoSerializer,就要为Kryo设置这个类去注册你自定义的类,该类必须继承自KryoRegistrator,实现其中的registerClasses(kryo: Kryo)即可。

spark.local.dir

  默认值为/tmp。用于设定Spark的缓存目录,包括了mapper输出的文件,缓存到磁盘的RDD数据。最好将这个属性设定为访问速度快的本地磁盘。同Hadoop一样,可以用逗号分割来设定多个不同磁盘的目录。需要注意,在Spark 1.0和之后的版本,这个属性将会被SPARK_LOCAL_DIRS (Standalone, Mesos) 或者 LOCAL_DIRS (YARN) 环境变量替代。

spark.logConf

  默认值是false。当SparkContext启动的时候,以INFO日志级别记录下有效的SparkConf 。

以上参考文档:https://www.iteblog.com/archives/1143.html#sparkappname

当创建好sparksession后,就可以读取数据了

用一个map来存储读取文件的格式

val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs:")

再读取hdfs上的数据

val data all=spark.sqlContext.read.options(options).format("com.databricks.spark.csv").load()
或者
val data all=spark.sqlContext.read.options(options).csv(filepath)

存储数据在hdfs上

val saveOption = Map("header" -> "true", "delimiter" -> "\t", "path" -> path)
data.repartition(1).write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveOption).save()

读取的数据创建临时表格

data.createOrReplaceTempView("groupData")

可用sparksession.sql对数据进行字段提取,处理

val result = spark.sql("select IP,sum(COUNT) COUNT from groupData group by IP")
时间: 2024-12-29 11:04:00

创建spark_读取数据的相关文章

五种情况下会刷新控件状态(刷新所有子FWinControls的显示)——从DFM读取数据时、新增加子控件时、重新创建当前控件的句柄时、设置父控件时、显示状态被改变时

五种情况下会刷新控件状态(刷新控件状态才能刷新所有子FWinControls的显示): 在TWinControls.PaintControls中,对所有FWinControls只是重绘了边框,而没有整个重绘这些FWinControl子控件.那么什么时候才整个重绘全部FWinControls呢?这时候,就不是一个单纯的WM_PAINT来解决控件重绘的问题了,而是这个TWinControl.UpdateShowing函数: procedure TWinControl.UpdateShowing; v

创建一个sms.db数据库俩面在创建一个message表,插入数据然后在读取数据

FMDB第三方库 导入头文件 #import "FMDatabase.h"#import "FMResultSet.h" FMDatabase *_database;//数据库对象 - (void)readData{     //1.获取数据库文件的路径     NSArray *path=NSSearchPathForDirectoriesInDomains(NSCachesDirectory, NSUserDomainMask, YES);    NSStrin

python openpyxl模块实现excel的读取,新表创建及原数据表追加新数据

当实际工作需要把excel表的数据读取出来,或者把一些统计数据写入excel表中时,一个设计丰富,文档便于寻找的模块就会显得特别的有吸引力,本文对openpyxl模块的一些常见用法做一些记录,方便工作中查询(好记性不如烂笔头) author:he    qq:760863706    python:3.5    date:2018-9-14 1:安装openpyxl pip install openpyxl 1 2:excel表读取数据(.xlsx) import openpyxlfilepat

文件操作ofstream,open,close,ifstream,fin,按照行来读取数据, fstream,iosin iosout,fio.seekg(),文件写入和文件读写,文件拷贝和文件

 1.ofstream,open,close 写入文件 #include<iostream> #include<fstream> using namespace std; //通过ofstream的方式实现写入文件 open,close void main() { ofstream fout;  //ofstream输出文件 fout.open("E:\\1.txt");//打开文件 fout << "1234abcdef";

FileInputStream利用缓冲数组读取数据

package cd.itcast.fileinputstream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; public class Demo1 { public static void main(String[] args) throws IOException { //目标文件 File fil

CRUD操作 create创建 read读取 update修改 delete删除

1.注释语法:--,#2.后缀是.sql的文件是数据库查询文件3.保存查询4.在数据库里面 列有个名字叫字段 行有个名字叫记录 CRUD操作:create 创建(添加)read 读取update 修改delete 删除 1.添加数据insert into Info values('p009','张三',1,'n001','2016-8-30 12:9:8') ; 给特定的列添加数据insert into Info (code,name) values('p010','李四');自增长列的处理in

基本数据持久性(二) 使用sqlite保存和读取数据

关于基本数据的持久性,写过一篇文章来简述过(基本数据持久性(一) 使用plist保存和读取数据).这篇文章将简述采用数据库sqlite的方式来保存数据,并根据查询结果读取数据. 一.工作原理 sqlite采用表存储的方式,表的第一行(也就是我们常说的表头)在sqilte中被称为“字段”.对于标的每一行(除了字段)的信息,都有一个独一无二的列内容可以将表的每一行内容独立区分开(例如本文所示的案例,存储一个学生的信息——学号.姓名.年龄.班级.那么,学号这一列就可以将表的每一行内容独立区分开,因为每

基本数据持久性(一) 使用plist保存和读取数据

想保存成绩.记录得分.保存账号密码等等?数据持久性可以做到这一点!这篇文章通过简单的程序,来分享一下如何使用plist来保存和读取数据,以供大家参考学习. 一.程序的主要功能 1. xib文件如图1所示. 图 1 2. 通过“保存”按钮将4个textField(ID.Name.Age.Class后面的输入框)的内容保存到plist文件中. 3.通过“读取”按钮将plist中的内容分别读取到4个textField中. 二.实现步骤 1.先创建一个Single ViewController的视图,命

C#无限极分类树-创建-排序-读取 用Asp.Net Core+EF实现之方法二:加入缓存机制

在上一篇文章中我用递归方法实现了管理菜单,在上一节我也提到要考虑用缓存,也算是学习一下.Net Core的缓存机制. 关于.Net Core的缓存,官方有三种实现: 1.In Memory Caching 我理解是在内容中实现,这种方法适用于单服务器的生产环境. 2.a Distributed Cache 分部式缓存实现. 3.Response Cache 这种方式我理解为客户端缓存. 今天我只用了第一种实现方法,内存中缓存,之所以用这种方法我是觉得我这里用缓存的初衷是为了减少访问数据库的次数,