直接上代码
package com.jason.spark23 import org.apache.spark.sql.SparkSession import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat object WriteTest { implicit class ContextExtensions(val sc: SparkContext) extends AnyVal { def textFile( path: String, delimiter: String, maxRecordLength: String = "1000000" ): RDD[String] = { val conf = new Configuration(sc.hadoopConfiguration) // This configuration sets the record delimiter: conf.set("textinputformat.record.delimiter", delimiter) // and this one limits the size of one record: conf.set("mapreduce.input.linerecordreader.line.maxlength", maxRecordLength) sc.newAPIHadoopFile( path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf ) .map { case (_, text) => text.toString } } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("readtest") .master("local") .getOrCreate() import spark.implicits._ /*val pathjson = "C:\\notos\\code\\sparktest\\src\\main\\resources\\employees.json" println("====json df") //jsondf 会自动给schema设置类型 val jsonDf = spark.read.json(pathjson) jsonDf.show() //jsonDf.write.format("text").save("C:\\notos\\code\\sparktest\\src\\main\\resources\\text") jsonDf.rdd.saveAsTextFile("")*/ val pathtxt = "C:\\notos\\code\\sparktest\\src\\main\\resources\\people2.txt" val dd = spark.read.option("textinputformat.record.delimiter","||").format("text").load(pathtxt) dd.show() dd.rdd.collect.foreach(println) val sc = spark.sparkContext val people2 = sc.textFile(pathtxt,"||") people2.collect().foreach(println) spark.stop() } }
这里使用了scala 中的隐式转换,当调用sc.textFile(path,delimiter)时 sc会被自动包装成ContextExtensions ,并调用其textFile 方法
原文地址:https://www.cnblogs.com/jason-dong/p/9653015.html
时间: 2024-11-09 00:40:30