spark 省份次数统计实例

//统计access.log文件里面IP地址对应的省份,并把结果存入到mysql

package access1

import java.sql.DriverManager

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}

object AccessIp {
  def main(args: Array[String]): Unit = {
    //new sc
    val conf = new SparkConf ()
      .setAppName ( this.getClass.getSimpleName )
      .setMaster ( "local[*]" )
    val sc = new SparkContext ( conf )

    //读取数据
    val accesslines = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\省份次数统计的数据\\access.log" )
    val iplines = sc.textFile ( "D:\\学习笔记\\资料汇总\\day02\\资料\\省份次数统计的数据\\ip.txt" )

    //处理数据
    val ip1 = iplines.map ( tp => {
      val splits = tp.split ( "[|]" )
      val start = splits ( 2 ).toLong
      val end = splits ( 3 ).toLong
      val province = splits ( 6 )
      (start, end, province)
    } ).collect ()

    //广播变量(这里使用是不对,当数据使用三次的时候,在使用广播变量,否则会占内存)
    val broads: Broadcast[Array[(Long, Long, String)]] = sc.broadcast ( ip1 )

    //处理数据
    val result2 = accesslines.map ( tp => {
      val splits = tp.split ( "[|]" )
      val ip = splits ( 1 )
      val ips = MyUtils.ip2Long ( ip )
      val valiues: Array[(Long, Long, String)] = broads.value
      val index = MyUtils.binarSearch ( valiues, ips )
      var province = ""

      if (index != -1) {
        province = valiues ( index )._3
      }
      (province, 1)
    } ).reduceByKey ( _ + _ ).sortBy ( -_._2 )

    //写入mysql
    result2.foreachPartition ( filter => {
      //获取mysql的链接
      val connection = DriverManager.getConnection ( "jdbc:mysql://localhost:3306/test1?characterEncoding=UTF-8&serverTimezone=GMT%2B8", "root", "123456" )
      filter.foreach ( tp => {
        val ps = connection.prepareStatement ( "insert into suibian values(?,?)" )

        //设置参数
        ps.setString ( 1, tp._1 )
        ps.setInt ( 2, tp._2 )

        //提交
        ps.executeLargeUpdate ()
        ps.close ()
      } )
      connection.close ()
    } )
    sc.stop ()
    broads.unpersist ( true )
  }
}
package access1

object MyUtils {
  //ip地址转换为lang类型
  def ip2Long(ip: String): Long = {
    val fragments = ip.split ( "[.]" )
    var ipNum = 0L
    for (i <- 0 until fragments.length) {
      ipNum = fragments ( i ).toLong | ipNum << 8L
    }
    ipNum
  }

  //二分查找法
  def binarSearch(array: Array[(Long, Long, String)], target: Long): Int = {
    var low = 0
    var high = array.length - 1

    while (low <= high) {
      var mid = low + ( high - low ) / 2
      if (array ( mid )._1 <= target && array ( mid )._2 >= target) {
        return mid
      } else if (array ( mid )._1 > target) {
        high = mid - 1
      } else {
        low = mid + 1
      }
    }
    return -1
  }
}

原文地址:https://www.cnblogs.com/wangshuang123/p/11082113.html

时间: 2024-10-12 15:06:32

spark 省份次数统计实例的相关文章

scala实战之spark用户在线时长和登录次数统计实例

接触spark后就开始学习scala语言了,因为有一点python和java的基础学习起来还行,今天在这里把我工作中应用scala编程统计分析用户行为日志的实例和大家分析一下,我这里主要讲一下用户的在线时长统计和登录次数统计算法实现过程. 第一步 编程环境:首先你得有spark安装包 你可以先不用本地安装spark,但是可以通过import spark-assembly-1.6.2-hadoop2.6.0.jar包来完成程序调试 另外需要scala的运行环境,我用的版本:scala-sdk-2.

组合数据类型练习,英文词频统计实例上

字典实例:建立学生学号成绩字典,做增删改查遍历操作. #创建一个空字典 dict={} s=dict print(s) #增加键值对(学号-成绩) s['001']=60 s['002']=70 s['003']=80 s['004']=90 print(s) #删除 s.pop('004') print(s) #修改 s['001']=69 print(s) #查找键是否存在 s.get('005','不存在') print(s) #便历 for i in s: print(i) 2.列表,元

组合数据类型练习,英文词频统计实例

1.由字符串创建一个作业评分表,做增删改查询统计遍历操作,例如查询第一个3分的下标,统计1分的同学有几个,3分的同学有几个,增删改查等等. 2.字典实例:建立学生学号成绩字典,做增删改查遍历操作. 3.列表,元组,字典,集合的遍历. 4.英文词频统计实例 news = '''When I was young I'd listen to the radio Waiting for my favorite songs When they played I'd sing along, It make

组合数据类型和英文词频统计实例

1.列表实例:由字符串创建一个作业评分列表,做增删改查询统计遍历操作.例如,查询第一个3分的下标,统计1分的同学有多少个,3分的同学有多少个等. >>> ls=list('1231323232323131323') >>> ls ['1', '2', '3', '1', '3', '2', '3', '2', '3', '2', '3', '2', '3', '1', '3', '1', '3', '2', '3'] >>> ls.append('4'

spark中的SparkContext实例的textFile使用的小技巧

网上很多例子,包括官网的例子,都是用textFile来加载一个文件创建RDD,类似sc.textFile("hdfs://n1:8020/user/hdfs/input") textFile的参数是一个path,这个path可以是: 1. 一个文件路径,这时候只装载指定的文件 2. 一个目录路径,这时候只装载指定目录下面的所有文件(不包括子目录下面的文件) 3. 通过通配符的形式加载多个文件或者加载多个目录下面的所有文件 第三点是一个使用小技巧,现在假设我的数据结构为先按天分区,再按小

组合数据类型练习,英文词频统计实例上列表,元组,字典,集合的遍历。 总结列表,元组,字典,集合的联系与区别。

1.字典实例:建立学生学号成绩字典,做增删改查遍历操作. d={'天':95,'青':78,'色':65,'等':66}print('学生成绩字典:',d)d['烟']=98print('增加:',d)d.pop('等')print('删除:',d)d['天']=78print('修改:',d)print('查询青成绩:',d.get('青','无')) 2.列表,元组,字典,集合的遍历.总结列表,元组,字典,集合的联系与区别. s=list('123456789')t=set('7564231

八、组合数据类型练习,英文词频统计实例上

1.字典实例:建立学生学号成绩字典,做增删改查遍历操作. dict={'001':'66','003':'77','006':'88','009':'99'} print('学生学号成绩:',dict) dict['007']=96 print('增加学号为007的学生的成绩为96:',dict) dict.pop('001') print('删除学号为001的学生的记录:',dict) dict['007']=100 print('修改学号为007的学生的成绩为100:',dict) prin

作业8-组合数据类型练习,英文词频统计实例上

1.字典实例:建立学生学号成绩字典,做增删改查遍历操作. 总结列表,元组,字典,集合的联系与区别. 运行结果: 2.列表,元组,字典,集合的遍历,总结列表,元组,字典,集合的联系与区别. 运行结果: 区别: 列表用"[]"表示,列表是可变的数据类型,即这种类型是可以被改变的,并且列表是可以嵌套的. 元组用"()"表示,元祖和列表十分相似,不过元组是不可变的,但也可以嵌套. 字典用"{}"表示,注意它们的键/值对用冒号分割,而各个对用逗号分割,所有

Spark MLlib Statistics统计

1.Spark MLlib Statistics统计 Spark Mllib 统计模块代码结构如下: 1.1 列统计汇总 计算每列最大值.最小值.平均值.方差值.L1范数.L2范数. //读取数据,转换成RDD[Vector]类型 val data_path = "/home/jb-huangmeiling/sample_stat.txt" val data = sc.textFile(data_path).map(_.split("\t")).map(f =>