1.
val lines=sc.textFile("hdfs://") ==加载进来成为RDD Resilient distributed dataset 弹性数据集
val errors=lines.filter(_.startsWith("ERROR")) ##transformation
val errors.persist() ##缓存RDD
val mysql_error=errors.filter(_.contains("mysql")).count ##action
val http_error=errors.filter(_.contains("http")).count ##action
2.
map是每一行数据返回一个数组,flatmap是所有的数据返回一个数组
val rdd=sc.parallelize(List(2,4,6,7,8)) --初始化rdd
val rdd1=rdd.map(2*_) --每个数乘2
rdd1.collect --显示rdd1
内行要这么写 val rdd_1=sc.parallelize(List(3,4,6,8,9)).map(3*_).filter(_>20).collect
# val rdd_count=rdd_1.flatMap(_.split(‘,‘)).map((_,1)).reduceByKey(_+_)
3.val s2=sc.textFile("/luo/s1.txt").flatMap(line=>line.split(",")).map(word=>(word,1)).reduceByKey(_+_) --路径是hdfs路径
s2.saveAsTextFile("/luo/result")
4.val s1=sc.textFile("/luo/s1.txt")
val rdd_count1=s1.flatMap(_.split(‘,‘)).map((_,1)).reduceByKey(_+_)
val rdd_count2= rdd_count1.groupByKey --按key进行分组
r1.count --有多少数据
val rdd_count3=s1.flatMap(_.split(‘,‘)).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile("/luo/result1")
5.val rd1=sc.parallelize(List((‘a‘,1),(‘a‘,2))),val rd2=sc.parallelize(List((‘b‘,1),(‘b‘,2)))
val rd3=rd1 union rd2 --结果:Array[(Char, Int)] = Array((a,1), (a,2), (b,1), (b,2))
rd1.lookup(‘a‘)
6 val r1=sc.parallelize(List((‘a‘,3),(‘a‘,5),(‘b‘,6),(‘b‘,9))) ,val r2=sc.parallelize(List((‘a‘,6),(‘a‘,12),(‘b‘,23),(‘b‘,34)))
val r3=r1 join r2 结果为:Array[(Char, (Int, Int))] = Array((b,(6,23)), (b,(6,34)), (b,(9,23)), (b,(9,34)), (a,(3,6)), (a,(3,12)), (a,(5,6)), (a,(5,12))) 笛卡尔集
r1.lookup(‘a‘)
7 val sum=sc.parallelize(List(1,2,4,5,65))
val sum1=sum.reduce(_+_) --求和
8.data.map(_.split(‘\t‘)(0)).filter(_<"201202012").count --(0) 表示取第一个元素
data.filter(_.split(‘\t‘).length==3).map(_.split(‘\t‘)(1)).map(_,1).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile("/luo") ---reduce之后数据就变成了元组 38:33
9. val rdd=sc.parallelize(List(2,4,6,7,8))
val evenfiy=(x:Int)=>if(x%2==0) x else None
10.spark-submit --master spark://127.0.0.1:7077 --name WordCountByscala --class spark.wordcount --executor-memory 1G --total-executor-cores 2 /home/luozt/spark/spark08241.jar /home/luozt/spark/README.md /home/luozt/spark_data
val tt=rdd.map(evenfiy)
10.val list="hello.world".toCharArray
val list1=1 to 10 toList
list.zip(list1)
list.zipAll(list1,‘q‘,2)
list.zipWithIndex
sparksql
val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
hiveContext.sql("show tables").take(10) //取前十个表看看
1.val sqlContext=new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class person(name:string,age Int)
val people=data.map(_.split(‘,‘)).map(p==>person(p(0),p(1).toInt))
people.saveAsParquetFile("/luo/luo.parquet") --写
val par=hiveContext.parquetFile("/luo/luo.parquet") --读
par.registerAsTable("par")
val ps=sql("select name from par where age>20")
2.case class word(wid:Int,aid:Int,Times:int)
val wo=sc.textFile("/luo/aa.txt").map(_.split(‘\t‘)).filter(_.length==3).map(w=>word(w(0).toInt,w(1).toInt,w(2).toInt)
wo.registerAsTable("wor")
sql("select * from wor");
3.DSL
‘wid 代表wid这一列
如:val rs=word.where(‘Times>50).where(‘wid>2).select(‘aid).limit(10)
4.spark对hive的操作
val hiveContext=new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
hql("show tables")
hql("select * from words where times>50 limit 10").collect