直接上代码:
第一部分:
case class OrdPacsresult_obj(pk_dcpv: String, result_obj: String)
第二部分:
def ordsubj: Unit = { import sparkSession.implicits._ import sparkSession.sql val dataset: Dataset[OrdPacsresult_subj] = sql("select pk_dcpv,result_obj,result_subj from diagbot.ord_rec_pacs_filter where result_subj is not null").as[OrdPacsresult_subj] val mapRDD: Dataset[(String, String)] = dataset.map(x => { val pk_dcpv: String = x.pk_dcpv val result_subj = x.result_subj (pk_dcpv, result_subj) }) val rdd: RDD[(String, String)] = mapRDD.rdd.repartition(100) val key: RDD[(String, String)] = rdd.reduceByKey((x, y) => { val buffer: StringBuffer = new StringBuffer() buffer.append(x).append("\n") if (!buffer.toString.contains(y)) { buffer.append(y).append("\n") } buffer.toString }) val f: DataFrame = key.map(x => { OrdPacsresult_subj(x._1, x._2) }).toDF() f.repartition(1).write.parquet("hdfs://192.168.2.232:9000/datas/parquetFile/OrdPacsresult_subj") }
原文地址:https://www.cnblogs.com/kwzblog/p/10180234.html
时间: 2024-11-11 04:38:09