spark HelloWorld程序(scala版)

使用本地模式,不需要安装spark,引入相关JAR包即可:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency> 

创建spark:

        val sparkUrl = "local"
        val conf = new SparkConf()
                //.setJars(Seq("/home/panteng/IdeaProjects/sparkscala/target/spark-scala.jar"))
                .set("fs.hdfs.impl.disable.cache", "true")
                .set("spark.executor.memory", "8g")

        val spark = SparkSession
                .builder()
                .appName("Spark SQL basic example")
                .config(conf)
                .config("spark.some.config.option", "some-value")
                .master(sparkUrl)
                .getOrCreate()    

加载本地文件:

val parquetFileDF = spark.read.parquet("/home/panteng/下载/000001_0")
            //spark.read.parquet("hdfs://10.38.164.80:9000/user/root/000001_0")

文件操作:

parquetFileDF.createOrReplaceTempView("parquetFile")

val descDF = spark.sql("SELECT substring(description,0,3) as pre ,description FROM parquetFile LIMIT 100000")
val diffDesc = descDF.distinct().sort("description")
diffDesc.createOrReplaceTempView("pre_desc")
val zhaoshang = spark.sql("select * from pre_desc")
zhaoshang.printSchema()

遍历处理:

zhaoshang.foreach(row => clustering(row))
val regexRdd = spark.sparkContext.parallelize(regexList)
regexRdd.repartition(1).saveAsTextFile("/home/panteng/下载/temp6")

spark.stop()

附其他函数:

def clustering(row: Row): String = {
        try {
            var tempRegex = new Regex("null")
            if (textPre.equals(row.getAs[String]("pre"))) {
                textList = row.getAs[String]("description").replaceAll("\\d","0") :: textList
                return "continue"
            } else {
                if (textList.size > 2) {
                    tempRegex = ScalaClient.getRegex(textList)
                    regexList = tempRegex :: regexList
                }
                if (row.getAs[String]("pre") != null && row.getAs[String]("description") != null) {
                    textPre = row.getAs[String]("pre")
                    textList = textList.dropRight(textList.size)
                    textList = row.getAs[String]("description") :: textList
                }
                return "ok - " + tempRegex.toString()
            }
        } catch {
            case e: Exception => println("kkkkkkk" + e)
        }
        return "error"
    }

package scala.learn

import top.letsgogo.rpc.ThriftProxy

import scala.util.matching.Regex

object ScalaClient {
    def main(args: Array[String]): Unit = {
        val client = ThriftProxy.client
        val seqList = List("您尾号9081的招行账户入账人民币689.00元",
            "您尾号1234的招行一卡通支出人民币11.00元",
            "您尾号2345的招行一卡通支出人民币110.00元",
            "您尾号5432的招行一卡通支出人民币200.00元",
            "您尾号5436的招行一卡通入账人民币142.00元")
        var words: List[String] = List()
        for (seq <- seqList) {
            val list = client.splitSentence(seq)
            for (wordIndex <- 0 until list.size()) {
                words = list.get(wordIndex) :: words
            }
        }
        val wordlist = words.map(word => (word, 1))
        //方法一:先groupBy再map
        var genealWords: List[String] = List()
        wordlist.groupBy(_._1).map {
            case (word, list) => (word, list.size)
        }.foreach((row) => {
            (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
        })

        val list = client.splitSentence("您尾号1234的招行一卡通支出人民币200.00元")
        val regexSeq: StringBuilder = new StringBuilder
        val specialChar = List("[", "]", "(", ")")
        for (wordIndex <- 0 until list.size()) {
            var word = list.get(wordIndex)
            if (genealWords.contains(word) && !("*".equals(word))) {
                if (specialChar.contains(word.mkString(""))) {
                    word = "\\" + word
                }
                regexSeq.append(word)
            } else {
                regexSeq.append("(.*)")
            }
        }
        println(regexSeq)
        val regex = new Regex(regexSeq.mkString)
        for (seq <- seqList) {
            println(regex.findAllIn(seq).isEmpty)
        }
    }

    def getRegex(seqList: List[String]) = {
        val client = ThriftProxy.client
        var words: List[String] = List()
        for (seq <- seqList) {
            val list = client.splitSentence(seq)
            for (wordIndex <- 0 until list.size()) {
                words = list.get(wordIndex) :: words
            }
        }
        val wordlist = words.map(word => (word, 1))
        //方法一:先groupBy再map
        var genealWords: List[String] = List()
        wordlist.groupBy(_._1).map {
            case (word, list) => (word, list.size)
        }.foreach((row) => {
            (if (row._2 >= seqList.size) genealWords = row._1 :: genealWords)
        })

        val list = client.splitSentence(seqList(0))
        val regexSeq: StringBuilder = new StringBuilder
        val specialChar = List("[", "]", "(", ")")
        for (wordIndex <- 0 until list.size()) {
            var word = list.get(wordIndex)
            if (genealWords.contains(word) && !("*".equals(word))) {
                if (specialChar.contains(word.mkString(""))) {
                    word = "\\" + word
                }
                regexSeq.append(word)
            } else {
                if(regexSeq.size > 4) {
                    val endStr = regexSeq.substring(regexSeq.size - 4, regexSeq.size - 0)
                    if (!"(.*)".equals(endStr)) {
                        regexSeq.append("(.*)")
                    }
                }else{
                    regexSeq.append("(.*)")
                }
            }
        }
        println(regexSeq + "  " + seqList.size)
        val regex = new Regex(regexSeq.mkString.replaceAll("0+","\\\\d+"))
        //for (seq <- seqList) {
        //    println(regex.findAllIn(seq).isEmpty)
        //}
        regex
    }
}

批量数据提取正则

时间: 2024-10-26 07:23:09

spark HelloWorld程序(scala版)的相关文章

Spark 入门(Python、Scala 版)

本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析.然后,将在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索).最后两节将开始通过命令行与Spark进行交互,然后演示如何用Python写Spark应用,并作为Spark作业提交到集群上.同时也会提供相应的 Scala 版本. 1.设置Spark环境 在本机设置和运行Spark非常简单.你只需要下载一个预构建的包,只要你安装了Java 6+和Python 2.6+,就可以在Windows.Mac O

Scala学习2 ———— 三种方式完成HelloWorld程序

三种方式完成HelloWorld程序 分别采用在REPL,命令行(scala脚本)和Eclipse下运行hello world. 一.Scala REPL. 按照第一篇在windows下安装好scala后,直接Ctrl+R,然后在运行命令窗里输入scala,或者输入cmd后,进入命令行在输入scala. 然后我们输入 print("Hello World!") 看下结果: 第一种方式运行完毕. 注意:前两行命令使用了Tab键,可以像bash一样有补全的功能哦! 二.Scala脚本完成H

Spark修炼之道(高级篇)——Spark源码阅读:第一节 Spark应用程序提交流程

作者:摇摆少年梦 微信号: zhouzhihubeyond spark-submit 脚本应用程序提交流程 在运行Spar应用程序时,会将spark应用程序打包后使用spark-submit脚本提交到Spark中运行,执行提交命令如下: root@sparkmaster:/hadoopLearning/spark-1.5.0-bin-hadoop2.4/bin# ./spark-submit --master spark://sparkmaster:7077 --class SparkWordC

Object-C 入门 Xcode 环境详解 HelloWorld 程序

作者 : 韩曙亮 转载请注明出处 : http://blog.csdn.net/shulianghan/article/details/38424965 一. Xcode 环境安装 与 工程创建 1. 下载环境 相关资源下载 : -- IOS 相关资料下载页面 : https://developer.apple.com/devcenter/ios/index.action ; -- Xcode 下载页面 : https://developer.apple.com/xcode/downloads/

最简单的基于FFMPEG的Helloworld程序

===================================================== 最简单的基于FFmpeg的视频播放器系列文章列表: 100行代码实现最简单的基于FFMPEG+SDL的视频播放器(SDL1.x) 最简单的基于FFMPEG+SDL的视频播放器 ver2 (採用SDL2.0) 最简单的基于FFmpeg的解码器-纯净版(不包括libavformat) 最简单的基于FFMPEG+SDL的视频播放器:拆分-解码器和播放器 最简单的基于FFMPEG的Hellowor

【转】Kafka producer原理 (Scala版同步producer)

转载自:http://www.cnblogs.com/huxi2b/p/4583249.html     供参考 本文分析的Kafka代码为kafka-0.8.2.1.另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本:一套是Java版的新版本.虽然Kafka社区极力推荐大家使用Java版本的producer,但目前很多已有的程序还是调用了Scala版的API.今天我们就分析一下旧版producer的代码. producer还分为同步和异步模式,由属性produc

使用Java编写并运行Spark应用程序

本文转载自:http://shiyanjun.cn/archives/742.html 我们首先提出这样一个简单的需求:现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况.这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http:

【原创】Kafka producer原理 (Scala版同步producer)

本文分析的Kafka代码为kafka-0.8.2.1.另外,由于Kafka目前提供了两套Producer代码,一套是Scala版的旧版本:一套是Java版的新版本.虽然Kafka社区极力推荐大家使用Java版本的producer,但目前很多已有的程序还是调用了Scala版的API.今天我们就分析一下旧版producer的代码. producer还分为同步和异步模式,由属性producer.type指定,默认是sync,即同步发送模式.本文也主要关注于同步发送的代码走读.下面以console-pr

【Activiti:学以致用】【第三章】Activiti的HelloWorld程序(Activiti modeler画图教学)

前两章 [Activiti:学以致用][第一章] 工作流核心API [Activiti:学以致用][第二章]Activiti的配置文件XML整合与十万个为什么 这章开始来个简单HelloWorld程序来热热身子,激动一下.... 首先我们先来画个流程图(我用的是京缘网络提供的在线流程设计器,好像是基于原生的activiti改造的),至于怎么画?看下图吧 点击第一个事件(对,就是圆形那个),填写属性值分别id是start和名称是开始,相对的另外一个就是id是end和名称是结束了 简单的画好图,然后