spark Basic code demo

spark-shell --master=spark://namenode01:7077 --executor-memory 2g --driver-class-path /app/spark141/lib/mysql-connector-java-5.1.6-bin.jar

hdfs dfs -put README.md ./
val file=sc.textFile("hdfs:///user/hadoop/README.md").filter(line=>line.contains("spark"))
val wordcount=sc.textFile("hdfs:///user/hadoop/README.md").flatMap(_.split(‘ ‘)).map((_,1)).reduceByKey(_+_)
wordcount.saveTextFile("/data/result")

//sort by count
val wordcount2=sc.textFile("hdfs:///user/hadoop/README.md").flatMap(_.split(‘ ‘)).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1))
wordcount2.saveAsTextFile("/data/wordcount2")

//启动hive metasotre service SPARK sql show
nohup hive --service metastore > metastore.log 2>&1 &
注意:如果要使用hive,需要将hive-site.xml文件复制到conf/下
pssh " cp /app/hive/lib/mysql-connector-java-5.1.6-bin.jar /app/spark141/lib/"
spark-shell --master=spark://namenode01:7077 --executor-memory 2g --driver-class-path /app/spark141/lib/mysql-connector-java-5.1.6-bin.jar
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.sql("use test")
hiveContext.sql("show tables").collect().foreach(println)

spark-sql --driver-class-path /app/spark141/lib/mysql-connector-java-5.1.6-bin.jar
just like use hive , write sql
use test
show tables

//parallelize show
val num=sc.parallelize(1 to 10)
val alpha=sc.parallelize(‘a‘ to ‘z‘)
val num2=num.map(_*2).collect().foreach(println)
val num3=num.map(_%3==0).collect().foreach(println)
val num3=num.filter(_%3==0).collect().foreach(println)

num.reduce(_+_)
num.reduce(_*_)
num.reduceByKey(_+_)
num.sortBy(x=>x,false)
//K-V演示
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
kv1.sortByKey().collect //注意sortByKey的小括号不能省 asc
kv1.sortByKey(false).collect //desc
//how to sort by value?
kv1.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1)).collect
kv1.sortBy(x=>x).collect
kv1.groupByKey().collect
kv1.reduceByKey(_+_).collect

val kv2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5)))
kv2.distinct.collect
kv1.union(kv2).collect

val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))
kv1.join(kv3).collect
kv1.cogroup(kv3).collect

val kv4=sc.parallelize(List(List(1,2),List(3,4)))
kv4.flatMap(x=>x.map(_+1)).collect
时间: 2024-08-19 01:50:01

spark Basic code demo的相关文章

Setup Spark source code environment

1. Install Java and set JAVA_HOME 2. Install IntelliJ IDH and Scala plugin 3. Download spark1.0.0 4. Generate a workspace for IDEA [[email protected] spark-1.0.0]$ export http_proxy=proxy01.cd.intel.com:911[[email protected] spark-1.0.0]$ sbt/sbt -Dh

Code demo no tab

Code ? import java.util.Random; import java.awt.*; //引入的包,Graphics所在的包 import java.applet.*; ? class Particle { protected int x; protected int y; protected final Random rng = new Random(); ? public Particle(int initialX, int initialY) { x = initialX;

spark mllib prefixspan demo

./bin/spark-submit ~/src_test/prefix_span_test.py source code: import os import sys from pyspark.mllib.fpm import PrefixSpan from pyspark import SparkContext from pyspark import SparkConf sc = SparkContext("local","testing") print(sc)

basic code

/*带权并查集带权值的并查集只不过是在并查集中加入了一个value[]数组value[]可以记录很多东西,也可是类似距离这种东西,也可以是相对于根节点的状态加入了权值,相对于并查集函数有些改变*/ 1 int findfat(int x){ 2 if(fat[x]==x) return x; 3 int temp=fat[x]; 4 fat[x]=findfat(fat[x]); 5 //在此处修改val比如 6 value[x]=value [temp]+1; 7 return fat[x];

Spark RDD与共享变量简介

hadoop有两个东东:HDFS(存储)和MapReduce(计算).MapReduce计算比较慢,于是Spark(速度是MR的10~100倍)出现了.Spark有两个核心的概念:弹性分布式数据集RDD与共享变量.下面进行一下简单的介绍. 弹性分布式数据集(RDD)获得方式:1 并行化驱动程序内的集合; 2 从外部数据集加载. 1 并行化驱动程序内的集合code demo val data = Array(1,2,3,4,5,6,7,8,9)   //普通数组 val distData = sc

Spark Streaming初步使用以及工作原理详解

在大数据的各种框架中,hadoop无疑是大数据的主流,但是随着电商企业的发展,hadoop只适用于一些离线数据的处理,无法应对一些实时数据的处理分析,我们需要一些实时计算框架来分析数据.因此出现了很多流式实时计算框架,比如Storm,Spark Streaming,Samaz等框架,本文主要讲解Spark Streaming的工作原理以及如何使用. 一.流式计算 1.什么是流? Streaming:是一种数据传送技术,它把客户机收到的数据变成一个稳定连续的流,源源不断地送出,使用户听到的声音或看

spark自定义分区及示例代码

有时自己的业务需要自己实现spark的分区函数 以下代码是实现一个自定义spark分区的demo 实现的功能是根据key值的最后一位数字,写到不同的文件 例如: 10写入到part-00000 11写入到part-00001 . . . 19写入到part-00009 给读者提供一个自定义分区的思路 import org.apache.spark.{Partitioner, SparkContext, SparkConf} //自定义分区类,需继承Partitioner类 class Usrid

搭建Spark源码研读和代码调试的开发环境

转载自https://github.com/linbojin/spark-notes/blob/master/ide-setup.md Table of Contents 源码获取与编译 从Github上获取Spark源码 编译Spark项目 源码导入与代码运行 导入源码到Intellij IDEA 16 运行实例代码 1. 配置运行参数 2. 添加缺失的flume sink源代码 3. 添加运行依赖的jars 4. 成功运行实例代码 单步调试源代码 工欲善其事,必先利其器,第一篇笔记介绍如何搭

Spark log

在测试spark计算时,将作业提交到yarn(模式–master yarn-cluster)上,想查看print到控制台这是imposible的,因为作业是提交到yarn的集群上,so 去yarn集群上看日志是很麻烦的,但有特别想看下print的信息,方便调试或者别的目的 在Spark的conf目录下,把log4j.properties.template修改为log4j.properties,原来的内容如下: <code style="display: block; padding: 0p