Spark是一个通用的大规模数据快速处理引擎。可以简单理解为Spark就是一个大数据分布式处理框架。基于内存计算的Spark的计算速度要比Hadoop的MapReduce快上50倍以上,基于磁盘的计算速度也快于10倍以上。Spark运行在Hadoop第二代的yarn集群管理之上,可以轻松读取Hadoop的任何数据。能够读取HBase、HDFS等Hadoop的数据源。
从Spark 1.0版本起,Spark开始支持Spark SQL,它最主要的用途之一就是能够直接从Spark平台上面获取数据。并且Spark SQL提供比较流行的Parquet列式存储格式以及从Hive表中直接读取数据的支持。之后,Spark SQL还增加了对JSON等其他格式的支持。到了Spark 1.3 版本Spark还可以使用SQL的方式进行DataFrames的操作。我们通过JDBC的方式通过前台业务逻辑执行相关sql的增删改查,通过远程连接linux对文件进行导入处理,使项目能够初步支持Spark平台,现如今已支持Spark1.6版本。那么从应用的前台与后台两个部分来简介基于Spark的项目开发实践。
前台:
1、 JDBC连接方式。
前台我们使用ThriftServer连接后台SparkSQL,它是一个JDBC/ODBC接口,通过配置Hive-site.xml,就可以使前台用JDBC/ODBC连接ThriftServer来访问HDFS的数据。ThriftServer通过调用hive元数据信息找到表或文件信息在hdfs上的具体位置,并通过Spark的RDD实现了hive的接口。对于业务的增、删、改、查都是通过SparkSQL对HDFS上存储的相应表文件进行操作。项目前台中需要引入相应hive-jdbc等的jar包。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop-common.version}</version>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>${hive-common.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive-exec.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive-jdbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>${hive-metastore.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>${hive-service.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>${libfb303.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
2、Parquet列式文件存储格式
我们使用Parquet面向列存存储的文件存储结构现如今的Spark版本已经支持了列式存储格式parquet,因为Parquet具有高压缩比的特点且适合嵌套数据类型的存储,能够避免不必要的IO性能。但在Spark1.3时并没有默认支持,这里就不再对该文件格式进行过多的说明,创建parquet格式表结构建表语句如下:
Create table yangsy as select * from table name
3、数据的导入。
使用的是Apache的一个项目,最早作为Hadoop的一个第三方模块存在,主要功能是在Hadoop(hive)与传统的数据库(mysql、oracle等)间进行数据的传递,可以将一个关系型数据库中的数据导入到Hadoop的HDFS中,也可以将HDFS的数据导进到关系数据库中。
这里注意,执行sqoop导入需要占用yarn的资源进行mapreduce,由于spark开启后即便在空闲状态下也不释放内存,故修改spark-env.sh配置降低memory或暂时停用thfitserver,分以便运行sqoop。
Create job -f 3 -t 4
Creating job for links with from id 3 and to id 4
Please fill following values to create new job object
Name: Sqoopy
From database configuration
Schema name: hive
Table name: TBLS
Table SQL statement:
Table column names:
Partition column name:
Null value allowed for the partition column:
Boundary query:
ToJob configuration
Output format:
0 : TEXT_FILE
1 : SEQUENCE_FILE
Choose: 0
Compression format:
successfully created with validation status OK and persistent id 2
0 : NONE
1 : DEFAULT
2 : DEFLATE
3 : GZIP
4 : BZIP2
5 : LZO
6 : LZ4
7 : SNAPPY
8 : CUSTOM
Choose: 0
Custom compression format:
Output directory: hdfs://hadoop000:8020/sqoop2
Throttling resources
Extractors:
Loaders:
New job was
4、前后台的交互实现工具类。
工具类提供静态的方法,可以进行相应业务逻辑的调用,由于Hadoop集群存在于服务器端,前台需要实现跨平台服务器的连接,才能执行相应的Hadoop命令,实现对HDFS上文件的操作。此次设计的ShellUtils类,通过jsch连接Linux服务器执行shell命令.需要引入jsch的jar包:
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>${jsch.version}</version>
</dependency>
private static JSch jsch;
private static Session session;
public static void connect(String user, String passwd, String host) throws JSchException {
jsch = new JSch();
session = jsch.getSession(user, host,22);
session.setPassword(passwd);
java.util.Properties config = new java.util.Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
5、数据的下载:
通过传入的Linux命令、用户名、密码等参数对远程linux服务器进行连接。调用hadoop的cat命令直接将文件从HDFS上合并下来通过ftp方式传入tomcat所在服务器,拿到相应的清单文件,大大减少了读取生成文件所需要的时间。命令如下:
String command = "cd " + ftpPath + " && " + hadoopPath + "hadoop fs -cat ‘" + hdfsPath+ listRandomName + "/*‘>" + listName1+".csv;"+ "sed -i ‘1i"+ title +"‘ " + listName1+".csv;"
CodecUtil类,用来实现不同类型压缩文件的解压工作,通过传入的压缩类型,利用反射机制锁定压缩的类型,由于存储在hdfs上的文件都是以文件块的形式存在的,所以首先需要获取hdfs中文件的二级子目录,遍历查询到每一个文件块的文件路径,随后通过输入输出流进行文件的解压工作。然后将此类打包成jar包放入集群中,通过前台远程连接服务端,执行hadoop命令操作执行,实现类部分代码如下:
public class CodecUtil{
public static void main(String[] args) throws Exception {
//compress("org.apache.hadoop.io.compress.GzipCodec");
String listName = args[0];
String codecType = args[1];
String hdfsPath = args[2];
uncompress(listName,codecType,hdfsPath);
//解压缩
public static void uncompress(String listName,String CodecType,String hdfsPath) throws Exception{
Class<?> codecClass = Class.forName(CodecType);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path listf =new Path(hdfsPath+listName);
//获取根目录下的所有2级子文件目录
FileStatus stats[]=fs.listStatus(listf);
CompressionCodec codec = (CompressionCodec)
ReflectionUtils.newInstance(codecClass, conf);
int i;
for ( i = 0; i < stats.length; i++){
//获得子文件块的文件路径
String Random = findRandom();
Path list = new Path(stats[i].getPath().toString());
InputStream in = codec.createInputStream(inputStream);
FSDataOutputStream output = fs.create(new Path(hdfsPath + listName+"/"+unListName));
IOUtils.copyBytes(in, output, conf);
IOUtils.closeStream(in);
}
}
}
6、功能性导入文件
通过功能选择,将需要导入的CSV文件通过ftp方式上传到Spark所在服务器,再将文件通过load的方式导入表中,实现导入文件的业务导入。执行sql如下:
String sql = " LOAD DATA LOCAL INPATH ‘" + Path + fileName
+ "‘ OVERWRITE INTO TABLE " + tabName;
7、获取表头信息。
可以使用describe table,从而获取HDFS上对应的表头信息,从而根据业务进行相应的业务逻辑处理。
8、JDBC连接问题
这里简要说一下执行的性能问题,我们通过JDBC方式提交SQL给spark,倘若SQL中含有大量的窗口函数像row_number over()一类的,在大数据量的情况下会造成任务执行完毕,但前台jdbc卡死,程序无法继续进行的情况。这是由于像窗口函数以及聚合函数都是相当于MapReduce的Shuffle操作。在提交至Spark运行过程中, DAGScheduler会把Shuffle的过程切分成map和reduce两个Stage(之前一直被我叫做shuffle前和shuffle后),map的中间结果是写入到本地硬盘的,而不是内存,所以对磁盘的读写要求非常高,(最好是固态硬盘比较快,本人亲自尝试,同样的性能参数下,固态硬盘会比普通磁盘快10倍。还有,通过调大shuffle partition的数目从而减少每个task所运算的时间,可以减少运行的时间,不至于前台卡死。但不建议前台使用窗口函数进行业务逻辑处理,前台卡死的几率还是很大。
9、性能调优部分参数
Spark默认序列化方式为Java的ObjectOutputStream序列化一个对象,速度较慢,序列化产生的结果有时也比较大。所以项目中我们使用kryo序列化方式,通过kryo序列化,使产生的结果更为紧凑,减少内存的占用空间,同时减少了对象本身的元数据信息与基本数据类型的开销,从而更好地提高了性能。
Spark默认用于缓存RDD的空间为一个executor的60%,项目中由于考虑到标签数量为成百个,使用同样规则与数量的标签进行客户群探索及客户群生成的概率很小。所以修改spark.storage.memoryFaction=0.4,这样使百分之60%的内存空间可以在task执行过程中缓存创建新对象,从而加大task的任务执行效率,以及spark.shuffle.memoryFraction参数。不过从至今Spark1.6已经动态的调整计算内存与缓存内存的大小,这个参数也可不比手动配置,具体要根据项目是缓存的数据还是计算数据的比例来决定。
10、decimal数据类型改为double数据类型
Decimal数据类型在spark1.3及spark1.4版本无法更好的支持parquet文件格式,生成文件时会报无法识别该类型,现如今的版本已经更加优化了decimal,但具体是否支持暂时尚未尝试。
至此,前台的相关方法就介绍完毕,开始后台
后台:
所谓的后台,就是进行真正的数据处理,用Scala编写处理逻辑生成jar包提交于spark-submit,生成从而服务于上层应用的数据表。
1、 环境变量的加载
val sparkConf = new SparkConf()
val sc: SparkContext = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)
这里可以通过直接set参数从而告诉spark要申请多少内存,多少个核,启动多少个executer例如:
val sparkConf = new SparkConf().setMaster("yarn").setAppName("app")
.set("spark.executor.memory", "4g")
不过不建议在代码中写死,可以写个配置文件加载类往里面传入参数,也可以通过在提交spark-submit的时候指定参数:
./bin/spark-submit --conf spark.ui.port=4444 --name "app" --master yarn-client --num-executors(num) --executor-cores (num) --executor-memory (num) --class main.asiainfo.coc.CocDss $CocBackHome/jar_name.jar
加载mysql中的配置信息表,从而进行相应的业务逻辑处理:
val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"
val table_name = sqlContext.jdbc(mySQLUrl,"table_name").cache()
2、多表关联:
val table_join = table1.join(table2, table1 ("id") === table2 ("id"),"inner") 或
Val table_join = table1.join(table2,”id”)
这里要注意一点,多表进行join的时候很容易造成ID冲突,由于应用于生产环境的依旧是Spark1.4版本(Spark1.5,1.6是否稳定有待测试,所以暂时没有用),所以还是使用第一种方法稳妥,该方法为Spark1.3的使用方法,毕竟稳定第一。
2、 读取本地数据文件,根据某个字段排序并注册成表:
case class test(column1 : String,column2: String,column3: String)
(这里要注意,case class一定要写在业务处理方法之外)
val loadData = sc.textFile(path)
val data = loadData.map(_.split(",")).sortBy(line => line.indexOf(1))
val dataTable = sqlContext.repartition(400).createDataFrame(data).registerTempTable("test")
sqlContext.sql("drop table if exists asiainfo_yangsy")
sqlContext.sql("select * from test").toDF().saveAsTable("asiainfo_yangsy ")
这里要注意的是,要调用registerTempTable函数,必须调用createDataFrame,经过资料查阅,读取文件生成的RDD只是个普通的RDD,而registerTempTable并不属于RDD类,所以通过创建SchemaRDD实例进行调用。随后注册成表后,转化为DataFrame,保存表至HDFS。,
顺便提一下repartition函数,通过此函数来设置patition的数量。因为一个partition对应的就是stage的一个task,那么根据真实的数据量进行设置,从而减少OOM的可能性。不过现如今Spark1.6版本已经支持自行调整parition数量,代码中可不比添加。
4、读取HDFS中的表或数据文件:
val loadData2 = sqlContext.read.table("asiainfo_")
5、describe函数
val select_table_cache = sqlContext.table(save_table_name)
al describeTable1 = select_table_cache.describe().show()
这里要强调下,describe函数的调用并不是前台那样,获取表头信息。而是获取相应列数据的count、mean、stddev、min以及max值。用于做一些简单的统计。
6、根据join后的DF生成需要业务数据的DF,并根据某个table某一字段进行排序
val select_table= table.select(table1("id"),
table("update_cycle"), table1("column_id"),
table2("table_id")).repartition(400).sort(table1("label_id"))
7、数据的MapReduce
val loadData = sc.textFile(path)
val map_reduce = loadData.
flatMap(line =>line.split(",")).map(w =>(w,1)).reduceByKey(_+_).foreach(println)
剩下还有很多使用的函数就不一一说明了,具体应用查官网API即可。