Spark工程开发前台技术实现与后台函数调用

Spark是一个通用的大规模数据快速处理引擎。可以简单理解为Spark就是一个大数据分布式处理框架。基于内存计算的Spark的计算速度要比Hadoop的MapReduce快上50倍以上,基于磁盘的计算速度也快于10倍以上。Spark运行在Hadoop第二代的yarn集群管理之上,可以轻松读取Hadoop的任何数据。能够读取HBase、HDFS等Hadoop的数据源。

从Spark 1.0版本起,Spark开始支持Spark SQL,它最主要的用途之一就是能够直接从Spark平台上面获取数据。并且Spark SQL提供比较流行的Parquet列式存储格式以及从Hive表中直接读取数据的支持。之后,Spark SQL还增加了对JSON等其他格式的支持。到了Spark 1.3 版本Spark还可以使用SQL的方式进行DataFrames的操作。我们通过JDBC的方式通过前台业务逻辑执行相关sql的增删改查,通过远程连接linux对文件进行导入处理,使项目能够初步支持Spark平台,现如今已支持Spark1.6版本。那么从应用的前台与后台两个部分来简介基于Spark的项目开发实践。

前台:

1、  JDBC连接方式。

前台我们使用ThriftServer连接后台SparkSQL,它是一个JDBC/ODBC接口,通过配置Hive-site.xml,就可以使前台用JDBC/ODBC连接ThriftServer来访问HDFS的数据。ThriftServer通过调用hive元数据信息找到表或文件信息在hdfs上的具体位置,并通过Spark的RDD实现了hive的接口。对于业务的增、删、改、查都是通过SparkSQL对HDFS上存储的相应表文件进行操作。项目前台中需要引入相应hive-jdbc等的jar包。

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>${hadoop-common.version}</version>

<exclusions>

<exclusion>

<artifactId>jdk.tools</artifactId>

<groupId>jdk.tools</groupId>

</exclusion>

<exclusion>

<groupId>tomcat</groupId>

<artifactId>jasper-compiler</artifactId>

</exclusion>

<exclusion>

<groupId>tomcat</groupId>

<artifactId>jasper-runtime</artifactId>

</exclusion>

<exclusion>

<groupId>javax.servlet.jsp</groupId>

<artifactId>jsp-api</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-common</artifactId>

<version>${hive-common.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-exec</artifactId>

<version>${hive-exec.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-jdbc</artifactId>

<version>${hive-jdbc.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-metastore</artifactId>

<version>${hive-metastore.version}</version>

</dependency>

<dependency>

<groupId>org.apache.hive</groupId>

<artifactId>hive-service</artifactId>

<version>${hive-service.version}</version>

</dependency>

<dependency>

<groupId>org.apache.httpcomponents</groupId>

<artifactId>httpclient</artifactId>

<version>${httpclient.version}</version>

</dependency>

<dependency>

<groupId>org.apache.thrift</groupId>

<artifactId>libfb303</artifactId>

<version>${libfb303.version}</version>

</dependency>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-api</artifactId>

<version>${slf4j-api.version}</version>

</dependency>

<dependency>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

<version>${slf4j-log4j12.version}</version>

</dependency>

<dependency>

<groupId>com.jcraft</groupId>

<artifactId>jsch</artifactId>

<version>${jsch.version}</version>

</dependency>

2、Parquet列式文件存储格式

我们使用Parquet面向列存存储的文件存储结构现如今的Spark版本已经支持了列式存储格式parquet,因为Parquet具有高压缩比的特点且适合嵌套数据类型的存储,能够避免不必要的IO性能。但在Spark1.3时并没有默认支持,这里就不再对该文件格式进行过多的说明,创建parquet格式表结构建表语句如下:

Create table yangsy as select * from table name

3、数据的导入。

使用的是Apache的一个项目,最早作为Hadoop的一个第三方模块存在,主要功能是在Hadoop(hive)与传统的数据库(mysql、oracle等)间进行数据的传递,可以将一个关系型数据库中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导进到关系数据库中。

这里注意,执行sqoop导入需要占用yarn的资源进行mapreduce,由于spark开启后即便在空闲状态下也不释放内存,故修改spark-env.sh配置降低memory或暂时停用thfitserver,分以便运行sqoop。

Create job -f 3 -t 4

Creating job for links with from id 3 and to id 4

Please fill following values to create new job object

Name: Sqoopy

From database configuration

Schema name: hive

Table name: TBLS

Table SQL statement:

Table column names:

Partition column name:

Null value allowed for the partition column:

Boundary query:

ToJob configuration

Output format:

0 : TEXT_FILE

1 : SEQUENCE_FILE

Choose: 0

Compression format:

successfully created with validation status OK  and persistent id 2

0 : NONE

1 : DEFAULT

2 : DEFLATE

3 : GZIP

4 : BZIP2

5 : LZO

6 : LZ4

7 : SNAPPY

8 : CUSTOM

Choose: 0

Custom compression format:

Output directory: hdfs://hadoop000:8020/sqoop2

Throttling resources

Extractors:

Loaders:

New job was

4、前后台的交互实现工具类。

工具类提供静态的方法,可以进行相应业务逻辑的调用,由于Hadoop集群存在于服务器端,前台需要实现跨平台服务器的连接,才能执行相应的Hadoop命令,实现对HDFS上文件的操作。此次设计的ShellUtils类,通过jsch连接Linux服务器执行shell命令.需要引入jsch的jar包:

<dependency>

<groupId>com.jcraft</groupId>

<artifactId>jsch</artifactId>

<version>${jsch.version}</version>

</dependency>

private static JSch jsch;  
    private static Session session;  
    public static void connect(String user, String passwd, String host) throws JSchException {  
        jsch = new JSch();  
        session = jsch.getSession(user, host,22);  
        session.setPassword(passwd);  
        java.util.Properties config = new java.util.Properties();  

config.put("StrictHostKeyChecking", "no");

session.setConfig(config);

5、数据的下载:

通过传入的Linux命令、用户名、密码等参数对远程linux服务器进行连接。调用hadoop的cat命令直接将文件从HDFS上合并下来通过ftp方式传入tomcat所在服务器,拿到相应的清单文件,大大减少了读取生成文件所需要的时间。命令如下:

String command = "cd " + ftpPath + " && " + hadoopPath + "hadoop fs -cat ‘" +  hdfsPath+ listRandomName + "/*‘>" + listName1+".csv;"+ "sed -i ‘1i"+ title +"‘ " + listName1+".csv;" 
 
 CodecUtil类,用来实现不同类型压缩文件的解压工作,通过传入的压缩类型,利用反射机制锁定压缩的类型,由于存储在hdfs上的文件都是以文件块的形式存在的,所以首先需要获取hdfs中文件的二级子目录,遍历查询到每一个文件块的文件路径,随后通过输入输出流进行文件的解压工作。然后将此类打包成jar包放入集群中,通过前台远程连接服务端,执行hadoop命令操作执行,实现类部分代码如下:

public class CodecUtil{

public static void main(String[] args) throws Exception {

//compress("org.apache.hadoop.io.compress.GzipCodec");

String listName = args[0];

String codecType = args[1];

String hdfsPath = args[2];

uncompress(listName,codecType,hdfsPath);

//解压缩

public static void uncompress(String listName,String CodecType,String hdfsPath) throws Exception{

Class<?> codecClass = Class.forName(CodecType);

Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);

Path listf =new Path(hdfsPath+listName);

//获取根目录下的所有2级子文件目录

FileStatus stats[]=fs.listStatus(listf);

CompressionCodec codec = (CompressionCodec)

ReflectionUtils.newInstance(codecClass, conf);

int i;

for ( i = 0; i < stats.length; i++){

//获得子文件块的文件路径

String Random = findRandom();

Path list = new Path(stats[i].getPath().toString());

InputStream in = codec.createInputStream(inputStream);

FSDataOutputStream output = fs.create(new Path(hdfsPath + listName+"/"+unListName));

IOUtils.copyBytes(in, output, conf);

IOUtils.closeStream(in);

}

}

}

 
6、功能性导入文件
通过功能选择,将需要导入的CSV文件通过ftp方式上传到Spark所在服务器,再将文件通过load的方式导入表中,实现导入文件的业务导入。执行sql如下:

String sql = " LOAD DATA LOCAL INPATH ‘" + Path + fileName

+ "‘ OVERWRITE INTO TABLE " + tabName;

 
7、获取表头信息。
可以使用describe table,从而获取HDFS上对应的表头信息,从而根据业务进行相应的业务逻辑处理。
8、JDBC连接问题
这里简要说一下执行的性能问题,我们通过JDBC方式提交SQL给spark,倘若SQL中含有大量的窗口函数像row_number over()一类的,在大数据量的情况下会造成任务执行完毕,但前台jdbc卡死,程序无法继续进行的情况。这是由于像窗口函数以及聚合函数都是相当于MapReduce的Shuffle操作。在提交至Spark运行过程中, DAGScheduler会把Shuffle的过程切分成map和reduce两个Stage(之前一直被我叫做shuffle前和shuffle后),map的中间结果是写入到本地硬盘的,而不是内存,所以对磁盘的读写要求非常高,(最好是固态硬盘比较快,本人亲自尝试,同样的性能参数下,固态硬盘会比普通磁盘快10倍。还有,通过调大shuffle partition的数目从而减少每个task所运算的时间,可以减少运行的时间,不至于前台卡死。但不建议前台使用窗口函数进行业务逻辑处理,前台卡死的几率还是很大。
 
9、性能调优部分参数
Spark默认序列化方式为Java的ObjectOutputStream序列化一个对象,速度较慢,序列化产生的结果有时也比较大。所以项目中我们使用kryo序列化方式,通过kryo序列化,使产生的结果更为紧凑,减少内存的占用空间,同时减少了对象本身的元数据信息与基本数据类型的开销,从而更好地提高了性能。
Spark默认用于缓存RDD的空间为一个executor的60%,项目中由于考虑到标签数量为成百个,使用同样规则与数量的标签进行客户群探索及客户群生成的概率很小。所以修改spark.storage.memoryFaction=0.4,这样使百分之60%的内存空间可以在task执行过程中缓存创建新对象,从而加大task的任务执行效率,以及spark.shuffle.memoryFraction参数。不过从至今Spark1.6已经动态的调整计算内存与缓存内存的大小,这个参数也可不比手动配置,具体要根据项目是缓存的数据还是计算数据的比例来决定。
 
10、decimal数据类型改为double数据类型
Decimal数据类型在spark1.3及spark1.4版本无法更好的支持parquet文件格式,生成文件时会报无法识别该类型,现如今的版本已经更加优化了decimal,但具体是否支持暂时尚未尝试。
至此,前台的相关方法就介绍完毕,开始后台
 

后台:

所谓的后台,就是进行真正的数据处理,用Scala编写处理逻辑生成jar包提交于spark-submit,生成从而服务于上层应用的数据表。

1、  环境变量的加载

val sparkConf = new SparkConf()

val sc: SparkContext = new SparkContext(sparkConf)

val sqlContext = new HiveContext(sc)

这里可以通过直接set参数从而告诉spark要申请多少内存,多少个核,启动多少个executer例如:

val sparkConf = new SparkConf().setMaster("yarn").setAppName("app")

.set("spark.executor.memory", "4g")

不过不建议在代码中写死,可以写个配置文件加载类往里面传入参数,也可以通过在提交spark-submit的时候指定参数:

./bin/spark-submit --conf spark.ui.port=4444 --name "app" --master yarn-client --num-executors(num) --executor-cores (num) --executor-memory (num) --class main.asiainfo.coc.CocDss $CocBackHome/jar_name.jar

加载mysql中的配置信息表,从而进行相应的业务逻辑处理:

val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"

val table_name = sqlContext.jdbc(mySQLUrl,"table_name").cache()

2、多表关联:

val table_join = table1.join(table2, table1 ("id") === table2 ("id"),"inner") 或

Val table_join = table1.join(table2,”id”)

这里要注意一点,多表进行join的时候很容易造成ID冲突,由于应用于生产环境的依旧是Spark1.4版本(Spark1.5,1.6是否稳定有待测试,所以暂时没有用),所以还是使用第一种方法稳妥,该方法为Spark1.3的使用方法,毕竟稳定第一。

2、  读取本地数据文件,根据某个字段排序并注册成表:

case class test(column1 : String,column2: String,column3: String)

(这里要注意,case class一定要写在业务处理方法之外)

val loadData = sc.textFile(path)

val data = loadData.map(_.split(",")).sortBy(line => line.indexOf(1))

val dataTable = sqlContext.repartition(400).createDataFrame(data).registerTempTable("test")

sqlContext.sql("drop table if exists asiainfo_yangsy")

sqlContext.sql("select * from test").toDF().saveAsTable("asiainfo_yangsy ")

这里要注意的是,要调用registerTempTable函数,必须调用createDataFrame,经过资料查阅,读取文件生成的RDD只是个普通的RDD,而registerTempTable并不属于RDD类,所以通过创建SchemaRDD实例进行调用。随后注册成表后,转化为DataFrame,保存表至HDFS。,

顺便提一下repartition函数,通过此函数来设置patition的数量。因为一个partition对应的就是stage的一个task,那么根据真实的数据量进行设置,从而减少OOM的可能性。不过现如今Spark1.6版本已经支持自行调整parition数量,代码中可不比添加。

4、读取HDFS中的表或数据文件:

val loadData2 = sqlContext.read.table("asiainfo_")

5、describe函数

val select_table_cache = sqlContext.table(save_table_name)

al describeTable1 = select_table_cache.describe().show()

这里要强调下,describe函数的调用并不是前台那样,获取表头信息。而是获取相应列数据的count、mean、stddev、min以及max值。用于做一些简单的统计。

6、根据join后的DF生成需要业务数据的DF,并根据某个table某一字段进行排序

val select_table= table.select(table1("id"),

table("update_cycle"), table1("column_id"),

table2("table_id")).repartition(400).sort(table1("label_id"))

7、数据的MapReduce

val loadData = sc.textFile(path)

val map_reduce = loadData.

flatMap(line =>line.split(",")).map(w =>(w,1)).reduceByKey(_+_).foreach(println)

剩下还有很多使用的函数就不一一说明了,具体应用查官网API即可。

时间: 2024-08-02 06:48:43

Spark工程开发前台技术实现与后台函数调用的相关文章

第9节课笔记-彻底实战IntelliJ IDEA 下的Spark程序开发

彻底实战IntelliJ IDEA 下的Spark程序开发下载IntelliJ IDEA 下载gitSpark源码下载:git clone git://github.com/apache/spark.git导入maven 工程 IntelliJ IDEA 启动的向导中Sacal下载需要下载,这是IDEA下载的,和系统层的不一样4.指定JDK1.8.x和Scala2.10.45.file ->Project Stucture 来设置工程lib 核心是添加Spark的jar6.添加Spark jar

Spark架构开发 大数据视频教程 SQL Streaming Scala Akka Hadoop

培训Spark架构开发! 从基础到高级,一对一培训![技术QQ:2937765541] --------------------------------------------------------------------------------------------------------------------------------------- 课程体系: 获取视频资料和培训解答技术支持地址 课程展示(大数据技术很广,一直在线为你培训解答!): 获取视频资料和培训解答技术支持地址

基于IDEA使用Spark API开发Spark程序

清明假期折腾了两天,总结了两种方式使用IDE进行spark程序,记录一下: 第一种方法比较简单,两种方式都是采用SBT进行编译的. 注意:本地不需要安装Scala程序,否则在编译程序时有版本兼容性问题. 一.基于Non-SBT方式 创建一个Scala IDEA工程 我们使用Non-SBT的方式,点击"Next" 命名工程,其他按照默认 点击"Finish"完成工程的创建 修改项目的属性 首先修改Modules选项 在src下创建两个文件夹,并把其属性改为source

Android开发的技术层次

任何一种移动开发生态系统其技术人员都是呈现金字塔式分布的.我借此也说说Developer和Programmer的区别: Programmer是真正意义上的程序员,写程序的.灵魂级 Developer是码农,搬砖的,砌代码,体力活.肉体级 做Developer的感觉就仿佛行尸走肉,被工作绑架,被代码绑架,而不是驾驭代码.如果每天,你感觉自己仿佛就是一坨肉,每天把自己运到公司, 灵魂出窍式,条件反射式地CODING,然后下班又运回家,回到家你才真正解脱,真正找到自我.富士康的小弟小妹们也有同样感觉.

Windows下基于eclipse的Spark应用开发环境搭建

原创文章,转载请注明: 转载自www.cnblogs.com/tovin/p/3822985.html 一.软件下载 maven下载安装 :http://10.100.209.243/share/soft/apache-maven-3.2.1-bin.zip       jdk下载安装:          http://10.100.209.243/share/soft/jdk-7u60-windows-i586.exe(32位)         http://10.100.209.243/sh

Spark API编程动手实战-08-基于IDEA使用Spark API开发Spark程序-01

创建一个Scala IDEA工程: 点击“Next”: 点击“Finish”完成工程的创建: 修改项目的属性: 首先修改Modules选项: 在src下创建两个文件夹,并把其属性改为source: 再修改Libraries: 因为要开发Spark程序,所以需要把Spark的开发需要的jar包导进来: 导入包完成后,在工程的scala下面创建一个package: 创建一个Object对象: 完成初始类的创建: 首先构建Spark Driver的模板代码: 该程序是对前面的搜狗日志的处理代码,只不过

Git工程开发实践(二)——Git内部实现机制

Git工程开发实践(二)--Git内部实现机制 一.Git仓库内部实现简介 Git本质上是一个内容寻址(content-addressable)的文件系统,根据文件内容的SHA-1哈希值来定位文件.Git核心部分是一个简单的键值对数据库(key-value data store).向Git数据库插入任意类型的内容,会返回一个键值,通过返回的键值可以在任意时刻再次检索(retrieve)插入的内容.通过底层命令hash-object可以将任意数据保存到.git目录并返回相应的键值.Git包含一套面

Git工程开发实践(七)——GitLab服务搭建

Git工程开发实践(七)--GitLab服务搭建 操作系统:RHEL 7.3 WorkStation 一.GitLab简介 1.GitLab简介 ?GitLab是一个利用Ruby on Rails开发的开源版本管理系统,是集代码托管.测试.部署于一体的开源git仓库管理软件,可通过web界面来进行访问公开或私人项目.GitLab能够浏览代码,管理缺陷和注释,可以管理团队对仓库的访问,非常易于浏览提交过的版本,并提供一个文件历史库,是目前非常流行的研发版本控制系统.Git:本地版本控制系统工具.G

spark JAVA 开发环境搭建及远程调试

spark JAVA 开发环境搭建及远程调试 以后要在项目中使用Spark 用户昵称文本做一下聚类分析,找出一些违规的昵称信息.以前折腾过Hadoop,于是看了下Spark官网的文档以及 github 上 官方提供的examples,看完了之后决定动手跑一个文本聚类的demo,于是有了下文. 1. 环境介绍 本地开发环境是:IDEA2018.JDK8.windows 10.远程服务器 Ubuntu 16.04.3 LTS上安装了spark-2.3.1-bin-hadoop2.7 看spark官网