Spark 单机 Demo.

安装好Spark 后,官方自带了一些demo, 路径在  Spark根目录/examples/src/main/python/

里面有些例子,例如统计字数的 wordcount.py

import sys
from operator import add

from pyspark import SparkContext

import sys
reload(sys)
sys.setdefaultencoding("utf-8")

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: wordcount <file>"
        exit(-1)
    sc = SparkContext(appName="PythonWordCount")
    lines = sc.textFile(sys.argv[1], 1)
    counts = lines.flatMap(lambda x: x.split(‘ ‘))                   .map(lambda x: (x, 1))                   .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print "%s: %i" % (word, count)

    sc.stop()

另外参考Spary 的 python api: http://spark.apache.org/docs/latest/api/python/pyspark.html

写了一个小demo,就是练习一下api的使用,做业务很方便。针对于大数据文件做统计分析的。比如几十兆上百兆的我们单机处理,上G的就放在hadoop 的 hdfs上。

下面是一个学生成绩单。四列字段:学生,以及三科成绩。其中学生有重复的(比如额外加分的情况,需要合并分析)。

yang    85  90  30
wang    20  60  50
zhang   90  90  90
li  100 54  0
yanf    0   0   0   
yang 12 0 0

当然实际中数据要很多,比如很多列,而且几十万行甚至几百万行。这里是一个demo ,相当于在部署前测试。

在 Spark根目录/example/src/main/python/ 下新建一个 students.py :

#coding=utf-8

import sys
from operator import add
from pyspark import SparkContext

import sys
reload(sys)
sys.setdefaultencoding("utf-8")

def map_func(x):
    s = x.split()
    return (s[0],[int(s[1]),int(s[2]),int(s[3])])

def f(x):
    return x
    rank = sc.parallelize(range(0,sorted.count()))

def add(a,b):
    return [a[r]+ b[r] for r in range(len(a))]

def _merge(a,b):
    print ‘****‘
    return [a[r]+ b[r] for r in range(len(a))]

#the students who has one score is 100
def has100(x):
    for y in x:
        if(y==100):
            return True
    return False

def allIs0(x):
    if(type(x) == list and sum(x) == 0):
        return True
    return False

def subMax(x,y):
    m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i] for i in range(3)]
    return(‘‘,m)

def sumAll(x,y):
    return (‘‘,[x[1][i]+y[1][i] for i in range(3)])

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: students <file>"
        exit(-1)
    sc = SparkContext(appName="Students")
    #加载学生文件,调用map将学生映射成keyValues.其中,key是学生,Value是学生成绩。map后的结果如(‘yang‘,(85,90,30))
    # 之后调用 CombineByKey,将相同学生的成绩相加(合并)。然后调用cache, 将整个数据缓存,以便多次进行reduce而无需每次都重新生成。
    lines = sc.textFile(sys.argv[1], 1).map(map_func).combineByKey(f,add,_merge).cache()
    #print lines
    count = lines.count()
    
    # 获取学生中三科成绩有满分的,调用filter来实现
    whohas100 = lines.filter(lambda x: filter(has100,x)).collect()
    # 获取三科中所有成绩都是0的同学(缺考)
    whoIs0 = lines.filter(lambda x: filter(allIs0,x)).collect()
    # 获取每个学生的成绩总和
    sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
    # 获取三科中,单科最高分
    subM = lines.reduce(subMax)
    # 获取学生单科成绩的总和,求单科平均分用
    sumA = lines.reduce(sumAll)
    # 总分最高的学生
    maxScore = max(sumScore,key = lambda x: x[1])
    # 总分最低的学生
    minScore = min(sumScore,key = lambda x: x[1])
    # 所有学生三科成绩平均分
    avgA = [x/count for x in sumA[1]]
    # 根据总分进行排序(默认由小而大)
    sorted = lines.sortBy(lambda x: sum(x[1]))
    # 排序并附带序号
    sortedWithRank = sorted.zipWithIndex().collect()
    # 取出成绩最高的前三名同学,发奖!
    first3 = sorted.takeOrdered(3,key = lambda x: -sum(x[1]))
   
    #print ‘*‘*50
    print whohas100
    print maxScore
    print whoIs0
    print subM
    print avgA
    print sorted.collect()
    print sortedWithRank
    print first3
    
    #将结果汇总输出到文件
    file = open(‘/home/yanggaofei/downloads/result.txt‘,‘w‘)
    file.write(‘students num:‘+`count`+ ‘\n‘)
    file.write(‘who has a 100 scores:‘ + str(whohas100) + ‘\n‘)
    file.write(‘who all is 0:‘ + str(whoIs0) + ‘\n‘)
    file.write(‘the max score of each subject:‘ + str(subM) + ‘\n‘)
    file.write(‘the avg score of each subject:‘ + str(avgA) + ‘\n‘)
    file.write(‘sorted the students:‘ + str(sorted.collect()) + ‘\n‘)
    file.write(‘sorted the students with the rank:‘ + str(sortedWithRank) + ‘\n‘)
    file.write(‘the first 3 who will get the award:‘ + str(first3) + ‘\n‘)
    file.close()

好了,运行:

[[email protected] spark-1.1.1]# ./bin/spark-submit examples/src/main/python/students.py temp/student.txt

运行结果result.txt如下:

students num:5
who has a 100 scores:[(u‘li‘, [100, 54, 0])]
who all is 0:[(u‘yanf‘, [0, 0, 0])]
the max score of each subject:(‘‘, [100, 90, 90])
the avg score of each subject:[61, 58, 34]
sorted the students:[(u‘yanf‘, [0, 0, 0]), (u‘wang‘, [20, 60, 50]), (u‘li‘, [100, 54, 0]), (u‘yang‘, [97, 90, 30]), (u‘zhang‘, [90, 90, 90])]
sorted the students with the rank:[((u‘yanf‘, [0, 0, 0]), 0), ((u‘wang‘, [20, 60, 50]), 1), ((u‘li‘, [100, 54, 0]), 2), ((u‘yang‘, [97, 90, 30]), 3), ((u‘zhang‘, [90, 90, 90]), 4)]
the first 3 who will get the award:[(u‘zhang‘, [90, 90, 90]), (u‘yang‘, [97, 90, 30]), (u‘li‘, [100, 54, 0])]

Spark的运行过程会打印出任务执行的开始过程以及结束。表示没研究透,不做陈述。。。相比hadoop,Spark 是一个内存计算的MapReduce, 通过缓存机制,在性能上要好很多。它自身不带数据系统。但是支持 hdfs,mesos,hbase。文本文件等。它就是一个MapReduce框架。

深入了解可以看这篇文章 http://spark.apache.org/docs/latest/programming-guide.html

时间: 2024-10-10 06:25:10

Spark 单机 Demo.的相关文章

Spark单机编译(on CentOS 6)

注:1. 编译Spark之前,需要搭建Java和Scala环境,参见http://www.cnblogs.com/kevingu/p/4418779.html. 2. Spark之前使用sbt进行编译,现在建议使用maven并兼容sbt,但会逐步淘汰sbt编译方式.本文使用Maven工具编译Spark 1.2.0. 一.Maven工具搭建 (I)从http://maven.apache.org/download.cgi下载Maven二进制安装包apache-maven-3.2.5-bin.tar

python spark kmeans demo

官方的demo from numpy import array from math import sqrt from pyspark import SparkContext from pyspark.mllib.clustering import KMeans, KMeansModel sc = SparkContext(appName="clusteringExample") # Load and parse the data data = sc.textFile("/ro

spark单机环境搭建以及快速入门

1 单机环境搭建 系统环境 cat /etc/centos-release CentOS Linux release 7.3.1611 (Core) 配置jdk8 wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.

spark集群配置以及java操作spark小demo

spark 安装 配置 使用java来操作spark spark 安装 tar -zxvf spark-2.4.0-bin-hadoop2.7.tgz rm spark-2.4.0-bin-hadoop2.7.tgz mv spark-2.4.0-bin-hadoop2.7 spark sudo vim /etc/profile export SPARK_HOME=/usr/local/storm export PATH=$PATH:$SPARK_HOME/bin source /etc/pro

Spark单机环境安装

1.ubantu环境下安装JDK 我的jdk安装在/home/fuqiang/java/jvm目录下,scala,spark都是在此目录下,主要是JDK环境变量的设置$ sudo gedit /etc/profile在文档的最末尾加上export JAVA_HOME=/home/fuqiang/java/jvm/jdk1.7.0_79export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$CLASSPATHexport PATH=$JAVA_H

spark单机模式

1.下载spark,解压2.复制conf/spark-env.sh和conf/log4j.properties cp spark-env.sh.template spark-env.sh cp log4j.properties.template log4j.properties 3.编辑spark-env.sh,设置SPARK_LOCAL_IP,docker-1为主机名,对应IP为10.10.20.204 export SPARK_LOCAL_IP=docker-1 4.运行example,执行

安装spark单机环境

(假定已经装好的hadoop,不管你装没装好,反正我是装好了) 1 下载spark安装包 http://spark.apache.org/downloads.html 下载spark-1.6.1-bin-without-hadoop.tgz 2 将spark包解压到自己指定目录 然后在spark中指定hadoop jar包的路径 先执行 hadoop classpath把输出内容记下来 在spark conf路径下新建spark-env.sh 然后输入以下内容:(注意:::::::把hadoop

Mac spark 单机部署

因为应用需要开始学习数据处理技术,网上多使用spark,随大流也选用spark (spark性能是hadoop的100倍,我也是道听途说,没有亲测.) 1.ssh免密登录配置 Mac 自带ssh 不需安装,只需要生成秘要并放入秘要文件中即可 生成秘要文件: ssh-keygen -t rsa第一个输入提示是 生成文件名可以直接回车使用默认的文件名,如果默认文件名已经有文件存在会有提示是否覆盖,根据提示输入yes即可覆盖原有文件.紧接着提示输入密码和确认密码.生成的文件默认在~/.ssh/目录中,

Spark 伪分布式 & 全分布式 安装指南

0.前言 3月31日是 Spark 五周年纪念日,从第一个公开发布的版本开始,Spark走过了不平凡的5年:从刚开始的默默无闻,到13年的鹊起,14年的大爆发.Spark核心之上有分布式的机器学习,SQL,streaming和图计算库. 4月1日 spark 官方正式宣布 Spark 2.0 对Spark重构,更好支持手机等移动终端.Databricks创始人之一hashjoin透漏了相关的重构方法:利用Scala.js项目把Spark代码编译成JavaScript,然后利用Safari / C