使用Spark读写HDFS中的parquet文件
文件夹中的parquet文件
build.sbt文件
name := "spark-hbase" version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.1.0", "mysql" % "mysql-connector-java" % "5.1.31", "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.hbase" % "hbase-common" % "1.3.0", "org.apache.hbase" % "hbase-client" % "1.3.0", "org.apache.hbase" % "hbase-server" % "1.3.0", "org.apache.hbase" % "hbase" % "1.2.1" )
Scala实现方法
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import java.util.Properties import com.google.common.collect.Lists import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Result, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat /** * Created by mi on 17-4-11. */ case class resultset(name: String, info: String, summary: String) case class IntroItem(name: String, value: String) case class BaikeLocation(name: String, url: String = "", info: Seq[IntroItem] = Seq(), summary: Option[String] = None) case class MewBaikeLocation(name: String, url: String = "", info: Option[String] = None, summary: Option[String] = None) object MysqlOpt { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("WordCount").setMaster("local") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //定义数据库和表信息 val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8" val table = "baike_pages" //读取parquetFile,并写入Mysql val sparkSession = SparkSession.builder() .master("local") .appName("spark session example") .getOrCreate() val parquetDF = sparkSession.read.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow") // parquetDF.collect().take(20).foreach(println) //parquetDF.show() //BaikeLocation是读取的parquet文件中的case class val ds = parquetDF.as[BaikeLocation].map { line => //把info转换为新的case class中的类型String val info = line.info.map(item => item.name + ":" + item.value).mkString(",") //注意需要把字段放在一个case class中,不然会丢失列信息 MewBaikeLocation(name = line.name, url = line.url, info = Some(info), summary = line.summary) }.cache() ds.show() // ds.take(2).foreach(println) //写入Mysql // val prop = new Properties() // prop.setProperty("user", "root") // prop.setProperty("password", "123456") // ds.write.mode(SaveMode.Append).jdbc(url, "baike_location", prop) //写入parquetFile ds.repartition(10).write.parquet("/home/mi/coding/coding/baikeshow_data/baikeshow1") } }
时间: 2024-10-10 03:00:06