Spark&Hive:如何使用scala开发spark作业,并访问hive。

  • 背景:

接到任务,需要在一个一天数据量在460亿条记录的hive表中,筛选出某些host为特定的值时才解析该条记录的http_content中的经纬度:

解析规则譬如:

需要解析host: api.map.baidu.com
需要解析的规则:"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},
"confidence":25
需要解析http_conent:renderReverse&&renderReverse({"status":0,"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},"formatted_address":"???????????????????????????????????????","business":"","addressComponent":{"country":"??????","country_code":0,"province":"?????????","city":"?????????","district":"?????????","adcode":"330104","street":"????????????","street_number":"","direction":"","distance":""},"pois":[{"addr":"????????????5277???","cp":" ","direction":"???","distance":"68","name":"????????????????????????????????????","poiType":"????????????","point":{"x":120.25084961536486,"y":30.3112150
  • Scala代码实现“访问hive,并保存结果到hive表”的spark任务:

开发工具为IDEA16,开发语言为scala,开发包有了spark对应集群版本下的很多个jar包,和对应集群版本下的很多个jar包,引入jar包:

scala代码:

import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import java.util
import java.util.{UUID, Calendar, Properties}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.{Row, SaveMode, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{sql, SparkContext, SparkConf}
import org.apache.spark.sql.DataFrameHolder

/**
  * temp http_content
  **/
case class Temp_Http_Content_ParserResult(success: String, lnglatType: String, longitude: String, Latitude: String, radius: String)

/**
  * Created by Administrator on 2016/11/15.
  */
object ParserMain {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    //.setAppName("XXX_ParserHttp").setMaster("local[1]").setMaster("spark://172.21.7.10:7077").setJars(List("xxx.jar"))
        //.set("spark.executor.memory", "10g")
    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)

    // use abc_hive_db;
    hiveContext.sql("use abc_hive_db")
    // error date format:2016-11-15,date format must be 20161115
    val rdd = hiveContext.sql("select host,http_content from default.http where hour>=‘20161115‘ and hour<‘20161116‘")

    // toDF() method need this line...
    import hiveContext.implicits._

    // (success, lnglatType, longitude, latitude, radius)
    val rdd2 = rdd.map(s => parse_http_context(s.getAs[String]("host"), s.getAs[String]("http_content"))).filter(s => s._1).map(s => Temp_Http_Content_ParserResult(s._1.toString(), s._2, s._3, s._4, s._5)).toDF()
    rdd2.registerTempTable("Temp_Http_Content_ParserResult_20161115")
    hiveContext.sql("create table Temp_Http_Content_ParserResult20161115 as select * from Temp_Http_Content_ParserResult_20161115")

    sc.stop()
  }

  /**
    * @ summary: 解析http_context字段信息
    * @ param http_context 参数信息
    * @ result 1:是否匹配成功;
    * @ result 2:匹配出的是什么经纬度的格式:
    * @ result 3:经度;
    * @ result 4:纬度,
    * @ result 5:radius
    **/
  def parse_http_context(host: String, http_context: String): (Boolean, String, String, String, String) = {
    if (host == null || http_context == null) {
      return (false, "", "", "", "")
    }

    //    val result2 = parse_http_context(“api.map.baidu.com”,"renderReverse&&renderReverse({\"status\":0,\"result\":{\"location\":{\"lng\":120.25088311933617,\"lat\":30.310684375444877},\"formatted_address\":\"???????????????????????????????????????\",\"business\":\"\",\"addressComponent\":{\"country\":\"??????\",\"country_code\":0,\"province\":\"?????????\",\"city\":\"?????????\",\"district\":\"?????????\",\"adcode\":\"330104\",\"street\":\"????????????\",\"street_number\":\"\",\"direction\":\"\",\"distance\":\"\"},\"pois\":[{\"addr\":\"????????????5277???\",\"cp\":\" \",\"direction\":\"???\",\"distance\":\"68\",\"name\":\"????????????????????????????????????\",\"poiType\":\"????????????\",\"point\":{\"x\":120.25084961536486,\"y\":30.3112150")
    //    println(result2._1 + ":" + result2._2 + ":" + result2._3 + ":" + result2._4 + ":" + result2._5)

    var success = false
    var lnglatType = ""
    var longitude = ""
    var latitude = ""
    var radius = ""
    var lowerCaseHost = host.toLowerCase().trim();
    val lowerCaseHttp_Content = http_context.toLowerCase()
    //    api.map.baidu.com
    //    "result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},
    //    "confidence":25
    //     --renderReverse&&renderReverse({"status":0,"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},"formatted_address":"???????????????????????????????????????","business":"","addressComponent":{"country":"??????","country_code":0,"province":"?????????","city":"?????????","district":"?????????","adcode":"330104","street":"????????????","street_number":"","direction":"","distance":""},"pois":[{"addr":"????????????5277???","cp":" ","direction":"???","distance":"68","name":"????????????????????????????????????","poiType":"????????????","point":{"x":120.25084961536486,"y":30.3112150
    if (lowerCaseHost.equals("api.map.baidu.com")) {
      val indexLng = lowerCaseHttp_Content.indexOf("\"lng\"")
      val indexLat = lowerCaseHttp_Content.indexOf("\"lat\"")
      if (lowerCaseHttp_Content.indexOf("\"location\"") != -1 && indexLng != -1 && indexLat != -1) {
        var splitstr: String = "\\,|\\{|\\}"
        var uriItems: Array[String] = lowerCaseHttp_Content.split(splitstr)
        var tempItem: String = ""
        lnglatType = "BD"
        success = true
        for (uriItem <- uriItems) {
          tempItem = uriItem.trim()
          if (tempItem.startsWith("\"lng\":")) {
            longitude = tempItem.replace("\"lng\":", "").trim()
          } else if (tempItem.startsWith("\"lat\":")) {
            latitude = tempItem.replace("\"lat\":", "").trim()
          } else if (tempItem.startsWith("\"confidence\":")) {
            radius = tempItem.replace("\"confidence\":", "").trim()
          }
        }
      }
    }
    else if (lowerCaseHost.equals("loc.map.baidu.com")) {
      。。。
    }

    longitude = longitude.replace("\"", "")
    latitude = latitude.replace("\"", "")
    radius = radius.replace("\"", "")

    (success, lnglatType, longitude, latitude, radius)
  }
}

打包,注意应为我们使用的hadoop&hive&spark on yarn的集群,我们这里并不需要想spark&hadoop一样还需要在执行spark-submit时将spark-hadoop-xx.jar打包进来,也不需要在submit-spark脚本.sh中制定jars参数,yarn会自动诊断我们需要哪些集群系统包;但是,如果你应用的是第三方的包,比如ab.jar,那打包时可以打包进来,也可以在spark-submit 参数jars后边指定特定的包。

  • 写spark-submit提交脚本.sh:

  • 当执行spark-submit脚本出现错误时,怎么应对呢?

注意,我们这里不是spark而是spark on yarn,当我们使用yarn-cluster方式提交时,界面是看不到任何日志新的。我们需要借助yarn管理系统来查看日志:

1、根据返回的任务id查看历史日志:

yarn logs -applicationId  application_1475071482566_3329402

2、yarn页面查看日志

https://xx.xx.xx.xx:xxxxx/Yarn/ResourceManager/xxxx/cluster

用户名/密码:user/password

参考资料:

http://blog.csdn.net/sparkexpert/article/details/50964732

时间: 2024-10-10 22:11:18

Spark&Hive:如何使用scala开发spark作业,并访问hive。的相关文章

使用scala开发spark入门总结

使用scala开发spark入门总结 一.spark简单介绍 关于spark的介绍网上有很多,可以自行百度和google,这里只做简单介绍.推荐简单介绍连接:http://blog.jobbole.com/89446/ 1.    spark是什么? Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架.一般配合hadoop使用,可以增强hadoop的计算性能. 2.    Spark的优点有哪些? Sp

搭建scala 开发spark程序环境及实例演示

上一篇博文已经介绍了搭建scala的开发环境,现在进入正题.如何开发我们的第一个spark程序. 下载spark安装包,下载地址http://spark.apache.org/downloads.html(因为开发环境需要引用spark的jar包) 我下载的是spark-2.1.0-bin-hadoop2.6.tgz,因为我的scalaIDE版本是scala-SDK-4.5.0-vfinal-2.11-win32.win32.x86_64.zip 最好,IDE版本和spark版本要匹配,否则,开

底层战详解使用Java开发Spark程序(DT大数据梦工厂)

Scala开发Spark很多,为什么还要用Java开发原因:1.一般Spark作为数据处理引擎,一般会跟IT其它系统配合,现在业界里面处于霸主地位的是Java,有利于团队的组建,易于移交:2.Scala学习角度讲,比Java难.找Scala的高手比Java难,项目的维护和二次开发比较困难:3.很多人员有Java的基础,确保对Scala不是很熟悉的人可以编写课程中的案例预测:2016年Spark取代Map Reduce,拯救HadoopHadoop+Spark = A winning combat

spark性能调优:开发调优

在大数据计算领域,Spark已经成为了越来越流行.越来越受欢迎的计算平台之一.Spark的功能涵盖了大数据领域的离线批处理.SQL类处理.流式/实时计算.机器学习.图计算等各种不同类型的计算操作,应用范围与前景非常广泛. 然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的.如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来.因此,想要用好Spark,就必须对其进行合理的性能优化. Spark的

IntelliJ IDEA开发Spark的Maven项目Scala语言

1.Maven管理项目在JavaEE普遍使用,开发Spark项目也不例外,而Scala语言开发Spark项目的首选.因此需要构建Maven-Scala项目来开发Spark项目,本文采用的工具是IntelliJ IDEA 2016,IDEA工具越来越被大家认可,开发java, python ,scala 支持都非常好,安装直接下一步即可. 2.安装scala插件,File->Settings->Editor->Plugins,搜索scala即可安装. 3.创建Maven工程,File-&g

Spark3000门徒第9课IDEA中开发Spark实战总结

今晚听了王家林老师的第9课IDEA中开发Spark实战,课后作业是:在Idea中编写广告点击排名的程序并提交集群测试, IDEA社区版本就够用,Ultimate没必要还要钱 程序如下: object WordCountCluster { def main(args: Array[String]){ /** * 第一步:创建spark的配置对象SparkConf,设置Spark程序的运行时的配置信息 * */ val conf = new SparkConf() //创建SparkConf对象 c

Spark3000门徒第10课Java开发Spark实战总结

今晚听了王家林老师的第10课Java开发Spark实战,课后作业是:用Java方式采用Maven开发Spark的WordCount并运行在集群中 先配置pom.xml <groupId>com.dt.spark</groupId> <artifactId>SparkApps</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging

Spark3000门徒第8课IDE中开发Spark实战总结

今晚听了王家林老师的第8课IDE中开发Spark实战,课后作业是:在Eclipse中编写广告点击排名的程序并测试,程序如下: object WordCountCluster { def main(args: Array[String]){ /** * 第一步:创建spark的配置对象SparkConf,设置Spark程序的运行时的配置信息 */ val conf = new SparkConf() //创建SparkConf对象 conf.setAppName("First App")

Spark 1.0.0企业级开发动手:实战世界上第一个Spark 1.0.0课程,涵盖Spark 1.0.0所有的企业级开发技术

课程介绍 2014年5月30日发布了Spark 1.0.0版本,而本课程是世界上第一个Spark1.0.0企业级实践课程,课程包含Spark的架构设计.Spark编程模型.Spark内核框架源码剖析.Spark的广播变量与累加器.Shark的原理和使用.Spark的机器学习.Spark的图计算GraphX.Spark SQL.Spark实时流处理.Spark的优化.Spark on Yarn.JobServer等Spark 1.0.0所有的核心内容 最后以一个商业级别的Spark案例为基础,实战