第一个spark+scala程序

import org.apache.spark._
import SparkContext._
import java.util.{Calendar,Properties,Date,Locale}
import java.text.SimpleDateFormat

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.Formatter;

//热度-订阅数 2 发文频率 3 文章质量 5
//最高是十分 >10 也是=10

object WordCount {
//http://blog.chinaunix.net/uid-25885064-id-3430852.html

//scala时间处理-获取今天日期,昨天日期,本周时间,本月时间,时间戳转换日期,时间比较
//http://blog.csdn.net/springlustre/article/details/47273353

//update xrk_wx_openaccounts set hscore=‘‘ where openid=‘‘;

//fscore
//qscore

//update xrk_wx_openaccounts set fscore=‘‘,qscore=‘‘ where openid=‘‘;

def format1(value:Double ):String ={

var bd:BigDecimal = new BigDecimal(value)

bd = bd.setScale(2, RoundingMode.HALF_UP)

return bd.toString();

}

def rethscoreSql(hscore:Double,openid:String):String={

var sql:String="update xrk_wx_openaccounts set hscore=‘"+format1(hscore)+"‘ where openid=‘"+openid+"‘;"
sql

}

def retfscoreqscoreSql(fscore:Double,qscore:Double,openid:String):String={

var sql:String="update xrk_wx_openaccounts set fscore=‘"+format1(fscore)+"‘,qscore=‘"+format1(qscore)+"‘ where openid=‘"+openid+"‘;"
sql

}

//今天
def getNowDate():String={
var now:Date = new Date()
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
var NowTime = dateFormat.format( now )
NowTime
}
//获取昨天的日期
def getYesterday():String={
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-1)
var yesterday=dateFormat.format(cal.getTime())
yesterday
}

//获取7天前的日期
def get_7day():String={
var dateFormat:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
var cal:Calendar=Calendar.getInstance()
cal.add(Calendar.DATE,-7)
var yesterday=dateFormat.format(cal.getTime())
yesterday
}

// 字符串 转成时间
def strtoDate(tm:String):Date={
//val loc = new Locale("en")
// val fm = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",loc)

val fm = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
// val tm = "30/Jul/2015:05:00:50"
val dt2 = fm.parse(tm);
dt2
}

// 字符串 转成时间戳
def strtoDatetolong(tm:String):Long={
val dt= strtoDate(tm)
val ldt=dt.getTime()
ldt
}

def main(args: Array[String]) {
/*
* 总文章数 TotalArticle
总点击数 TotalClick
总阅读数 TotalReadNum
订阅号总数TotalOpenNum

平均文章数AvgArticle
平均点击数AvgClick
平均阅读数AvgReadNum

每个订阅号的文章数量OpenArticle
每个订阅号的总点击数OpenClick
每个订阅号的总阅读数OpenReadNum

每个订阅号的平均文章数量AvgOpenArticle
每个订阅号的平均击数AvgOpenClick
每个订阅号的平均阅读数AvgOpenReadNum
*
*
* */

if (args.length < 4 ){
println(" spark://192.168.16.119:7077 SparkSubmit_Demo ")
println(" /wxcontentdb/xrk_wx_articles/part-m-00000")
println(" /outtxt")
println(args.length.toString())
println(args(0))
return
}

def _float(line:String):Int={
val fileds = line.split("\t")
val timeLong=strtoDatetolong(fileds(3))
val _7dayTime=strtoDatetolong(get_7day())

if(timeLong>_7dayTime) 1 else 0

}

//大于10 小于 0.1

def fenzhi(fenzi:Double,fenmu:Double):Double={

var __fenzhi:Double=0.00

__fenzhi=(fenzi/fenmu)

if(__fenzhi>10){ 10.00
}else if(__fenzhi<0.1){ 0.00
}else{format1(__fenzhi).toDouble}

}

val conf = new SparkConf()
.setMaster(args(0))
.setAppName(args(1))
.set("spark.executor.memory", "3g")
val sc = new SparkContext(conf)

val xrk_wx_userorder = sc.textFile(args(3))//xrk_wx_userorder
//总条数xrk_wx_userorder_total_num
//总记录xrk_wx_userorder_total_record
//平均值xrk_wx_userorder_avg

val xrk_wx_userorder_total_num=xrk_wx_userorder.count()

val openid_num=xrk_wx_userorder.map(line => {val fileds = line.split("\t") ;( fileds(2))}).map((_,1)).reduceByKey(_ + _).map(x=>(x._2, x._1)).sortByKey(true).map(x=>(x._2,x._1))

val xrk_wx_userorder_total_record=openid_num.count()
val xrk_wx_userorder_avg = (xrk_wx_userorder_total_num/xrk_wx_userorder_total_record).toLong

val openid_num_ex=openid_num.map(x=>(x._1, x._2,xrk_wx_userorder_avg,fenzhi(x._2,xrk_wx_userorder_avg)))

/////////////////////////
val lines = sc.textFile(args(2))//wxcontentdb

//openid+time

val openid_time=lines.map(line => {val fileds = line.split("\t") ;( fileds(1)+"\t"+fileds(3))}).map((_,1)).reduceByKey(_ + _)

val _openid_time=openid_time.map(x=>(x._1.split("\t")(0))).map((_,1)).reduceByKey(_ + _).keyBy(top=>top._1)
//

val TotalArticle=lines.count()

val TotalClick= lines.map(line => {val fileds = line.split("\t") ;( fileds(4).toLong)}).reduce((a,b) => a+b)

val TotalReadNum= lines.map(line => {val fileds = line.split("\t") ;( fileds(5).toLong)}).reduce((a,b) => a+b)

val OpenArticle = lines.map(_.split("\t")(1)).map((_,1)).reduceByKey(_ + _)

val TotalOpenNum=OpenArticle.count()

val OpenClick=lines.map(line => {val fileds = line.split("\t") ;( fileds(1).toString(),fileds(4).toLong)}).reduceByKey(_ + _)
val OpenReadNum=lines.map(line => {val fileds = line.split("\t") ;( fileds(1).toString(),fileds(5).toLong)}).reduceByKey(_ + _)

//val txt= OpenArticle.map(x=>(x._2, x._1)).sortByKey(true).map(x=>(x._2,x._1))
val _OpenClick=OpenClick.keyBy(top=>top._1)
val _OpenReadNum=OpenReadNum.keyBy(top=>top._1)

val list= OpenArticle.keyBy(top=>top._1).join(_OpenClick).join(_OpenReadNum).join(_openid_time).map(f => (f._1, f._2._1._1._1._2, f._2._1._1._2._2, f._2._1._2._2, f._2._2._2))

val AvgClick=TotalClick/TotalOpenNum
val AvgReadNum=TotalReadNum/TotalOpenNum

// val txt=list.map(f =>(f._1,f._2,f._3,f._4,f._5,TotalArticle,TotalOpenNum,TotalClick,TotalReadNum));
val txt=list.map(f =>(f._1,fenzhi(f._3,AvgClick),fenzhi(f._4,AvgReadNum), fenzhi(f._2*10,8*14)/2.00+f._5*10.00/14.00/2.00 ))
//.keyBy(top=>top._2).sortByKey(true)
val _txt=txt.map(f =>{retfscoreqscoreSql((f._2+f._3)/2,f._4,f._1)})
val _openid_num_ex=openid_num_ex.map(f =>{rethscoreSql(f._4,f._1)})

// val _txt__openid_num_ex=_txt+"\n"+_openid_num_ex
// 文章数 点击 阅读 发文
//.map((_,TotalArticle,TotalClick,TotalReadNum))

//openid_num_ex.saveAsTextFile(args(4))
_openid_num_ex.saveAsTextFile(args(4))
_txt.saveAsTextFile(args(5))
sc.stop()

//val beginnow =new Date();
//val mbegindate = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") format beginnow

}
}

////////////////////////////////////////////////////////////提交///////////////////////////////////////////

/spark-1.0.2/bin/spark-submit  --class WordCount  spark-wordcount-in-scala.jar    spark://192.168.16.119:7077  SparkSubmit_Demo    /user/root/wxcontentdb/part-m-00000 /user/root/xrk_wx_userorder/part-m-00000    outtxt1 outtxt2  –num-workers 1 –master-memory 2g –worker-memory 2g

时间: 2024-10-10 02:54:04

第一个spark+scala程序的相关文章

[大数据从入门到放弃系列教程]第一个spark分析程序

文章施工中,由于部分网站会在我还没有写完就抓取到这篇文章,导致你看到的内容不完整,请点击这里: 或者复制访问 http://www.cnblogs.com/blog5277/p/8580007.html 来查看更完整的内容 [大数据从入门到放弃系列教程]第一个spark分析程序 原文链接:http://www.cnblogs.com/blog5277/p/8580007.html 原文作者:博客园--曲高终和寡 *********************分割线******************

寒假学习记录2_Scala解释器的使用以及Scala程序的编译执行

Scala解释器的使用: 由于在前面的安装过程中,已经自动设置了path变量,所以不需要给出scala命令的路径全称,在命令提示符终端中输入“scala”命令便会进入scala命令行提示符状态(即“scala>”),可以在后面输入命令. 运行Scala解释器以后,就可以测试了.输入一条语句,解释器会立即执行语句并返回结果,这就是REPL(Read-Eval-Print Loop,交互式解释器).为我们提供了交互式执行环境,表达式计算完成就会输出结果,而不必等到整个程序运行完毕,因此可即时查看中间

scala 入门Eclipse环境搭建及第一个入门经典程序HelloWorld

IDE选择并下载: scala for eclipse 下载: http://scala-ide.org/download/sdk.html 根据自己的机器配置选择合适的IDE: 我这里选择For scala2.11 版本的Windows 32 bit的IDE,单击即下载. scala安装: 安装包下载地址,进入官网:http://www.scala-lang.org/ 进入DOWNLOAD下,选择scala 2.11 版本,单击下载: Windows上安装scala 2.11: 单击运行sca

IntelliJ IDEA 第一个 Scala 程序

IntelliJ 安装完成 Scala 插件后,你需要尝试使用 IntelliJ 来创建并且运行第一个程序. 通常这个程序只是简单的输出 Hello World. 创建一个新工程 在文件下面选择新建,然后选择创建工程. 输入工程信息 在创建工程上面,输入工程的信息,指定工程的路径等. 在这里特别需要注意的地方是创建工程的 Scala SDK 配置,你需要现在你的系统中配置 SDK,有关配置 SDK 的方法,请参考页面Scala 安装及环境配置中的内容. 选择 SDK 单击创建后,你将会看到需要选

Spark学习(三)Scala程序例子

例一:对目录下的单词文件进行单词统计 /word/first.txt:                                                           /word/second.txt:                                                /word/third.txt:         运行结果: import java.io.File; import java.io.PrintWriter; import scal

【知乎】怎么成为一个优秀的程序员,而不是一个优秀的码农?

怎么成为一个优秀的程序员,而不是一个优秀的码农? 9 条评论 分享 默认排序按时间排序 98 个回答 3844赞同反对,不会显示你的姓名 萧井陌 微信公众号:炼瓜研究所 技术社区 - 3844 人赞同 优秀的程序员会告诉你打根基的重要性,会劝你在厚积薄发前要隐忍. 优秀的码农会告诉你学啥底层.啥啥啥一拖就好了,学了python还要啥自行车啊,数据结构排序函数二分搜索这不都内置了吗?工作中永远用不到,学算法有啥用啊?成为高手有很多种方法汇编是个屁啊? +++基础的分割线+++ 列举几个我认为比较重

Spark 1.0.0企业级开发动手:实战世界上第一个Spark 1.0.0课程,涵盖Spark 1.0.0所有的企业级开发技术

课程介绍 2014年5月30日发布了Spark 1.0.0版本,而本课程是世界上第一个Spark1.0.0企业级实践课程,课程包含Spark的架构设计.Spark编程模型.Spark内核框架源码剖析.Spark的广播变量与累加器.Shark的原理和使用.Spark的机器学习.Spark的图计算GraphX.Spark SQL.Spark实时流处理.Spark的优化.Spark on Yarn.JobServer等Spark 1.0.0所有的核心内容 最后以一个商业级别的Spark案例为基础,实战

使用Java编写并运行Spark应用程序

本文转载自:http://shiyanjun.cn/archives/742.html 我们首先提出这样一个简单的需求:现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况.这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http:

Mapreduce+Hive+Spark+Scala平台搭建

Mapreduce+Hive+Spark+Scala平台搭建 说明 平台搭建成功后,使用Java语言进行算法设计和应用的开发.文末有使用java设计的逻辑回归(Logistics Regression)做小数据集的二分类问题. 一.搭建准备 VMWare Workstation Ubuntu 14.04 Server.iso Xshell--远程连接主机终端 Server 版安装配置 新建三台虚拟机,安装时选择OpenSHH Server预安装环境 一台作为master 另两台作为slave,命