使用Pyspark编写wordcount程序

# Word count on manuscript using PySpark

# import regex module
import re
# import add from operator module
from operator import add

# read input file
file_in = sc.textFile(‘/home/an/Documents/A00_Documents/Spark4Py 20150315‘)

# count lines
print(‘number of lines in file: %s‘ % file_in.count())

# add up lenths of each line
#
chars = file_in.map(lambda s: len(s)).reduce(add)
print(‘number of characters in file: %s‘ % chars)

# Get words from the input file
words =file_in.flatMap(lambda line: re.split(‘\W+‘, line.lower().strip()))

# words of more than 3 characters
words = words.filter(lambda x: len(x) > 3)

# set count 1 per word
words = words.map(lambda w: (w,1))

# reduce phase - sum count all the words
words = words.reduceByKey(add)

# create tuple (count, word) and sort in descending
words = words.map(lambda x: (x[1], x[0])).sortByKey(False)

# take top 20 words by frequency
words.take(20)

# create function for hitogram of most frequent words
#

% matplotlib inline
import matplotlib.pyplot as plt
#

def histogram(words):
count = map(lambda x: x[1], words)
word = map(lambda x: x[0], words)
plt.barh(range(len(count)), count,color = ‘grey‘)
plt.yticks(range(len(count)), word)

# Change order of tuple (word, count) from (count, word)
words = words.map(lambda x:(x[1], x[0]))
words.take(25)

# display histogram
histogram(words.take(25))

# words in one summarised statement
words = sc.textFile(‘/home/an/Documents/A00_Documents/Spark4Py 20150315‘)
.flatMap(lambda line: re.split(‘\W+‘, line.lower().strip()))
.filter(lambda x: len(x) > 3)
.map(lambda w: (w,1))
.reduceByKey(add)
.map(lambda x: (x[1], x[0])).sortByKey(False)
words.take(20)

////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Spark Python API 文档

http://spark.apache.org/docs/latest/api/python/pyspark.html

官方示例

WordCount
bin/pyspark ./examples/src/main/python/wordcount.py /tmp/text
bin/spark-submit --master local --num-executors 10 ./examples/src/main/python/wordcount.py /tmp/text
bin/spark-submit --master yarn --num-executors 10 ./examples/src/main/python/wordcount.py /tmp/text

Pi
bin/spark-submit --master local --executor-memory 4G --num-executors 10 ./examples/src/main/python/pi.py

Python SparkConf

conf = SparkConf().setAppName("AppName").set("spark.executor.memory", "1g")
sc = SparkContext(conf=conf)

Python SparkContext

sc.textFile("/hdfs/path")
// 读取文件:Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

sc.union(rdd1, rdd2)
// RDDs 并集:Build the union of a list of RDDs.

sc.textFile("/hdfs/path").collect()
// 获取所有元素:Return a list that contains all of the elements in this RDD.

sc.parallelize([0, 2, 3, 4, 6], 5)
// 使用 Python 集合创建一个 RDD,一共 5 个分片:Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.

Python Spark RDD 交并差(交集,并集,差集)

交集: intersection
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect()
[1, 2, 3]

差集: subtract
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.subtract(rdd2).collect()
[4, 5, 10]

并集: subtract
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.union(rdd2).collect()
[1, 10, 2, 3, 4, 5, 1, 6, 2, 3, 7, 8]
>>> rdd1.union(rdd2).distinct().collect()
[1, 2, 3, 4, 5, 6, 7, 8, 10]
时间: 2024-11-10 14:43:43

使用Pyspark编写wordcount程序的相关文章

在Pycharm上编写WordCount程序

本篇博客将给大家介绍怎么在PyCharm上编写运行WordCount程序. 第一步 下载安装PyCharm 下载Pycharm PyCharm的下载地址(Linux版本).下载完成后你将得到一个名叫:pycharm-professional-2018.2.4.tar.gz文件.我们选择的是正版软件,学生可申请免费使用.详细信息请百度. 安装PyCharm 执行以下命令解压文件: cd ~/下载 tar -xvf pycharm-professional-2018.2.4.tar.gz Shell

用Python编写WordCount程序任务

1. 用Python编写WordCount程序并提交任务 程序 WordCount 输入 一个包含大量单词的文本文件 输出 文件中每个单词及其出现次数(频数),并按照单词字母顺序排序,每个单词和其频数占一行,单词和频数之间有间隔 2.编写map函数,reduce函数 import sys for line in sys.stdin: line=line.strip() words=line.split() for word in words: print '%s\t%s' % (word,1)

MapReduce编写wordcount程序代码实现

MapReduce经典案例代码(wordcount) 以经典的wordcount为例,通过自定义的mapper和reducer来实现单词计数 package com.fwmagic.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本] 1. 开发环境 Jdk 1.7.0_72 Maven 3.2.1 Scala 2.10.6 Spark 1.6.2 Hadoop 2.6.4 IntelliJ IDEA 2016.1.1 2. 创建项目1) 新建Maven项目 2) 在pom文件中导入依赖pom.xml文件内容如下: <?xml version="1.0" encoding="UTF-8"?> &l

win7 64位下安装hadoop的eclipse插件并编写运行WordCount程序

win7 64位下安装hadoop的eclipse插件并编写运行WordCount程序 环境: win7 64位 hadoop-2.6.0 步骤: 1.下载hadoop-eclipse-plugin-2.6.0.jar包 2.把hadoop-eclipse-plugin-2.6.0.jar放到eclipse安装目录下的plugins目录下 3.打开eclipse发现左边多出来一个DFS Locations 4.在win7上解压hadoop-2.6.0. 5.下载hadoop.dll.winuti

hadoop wordcount程序缺陷

在wordcount 程序的main函数中,没有读取运行环境中的各种参数的值,全靠hadoop系统的默认参数跑起来,这样做是有风险的,最突出的就是OOM错误. 自己在刚刚学习hadoop编程时,就是模仿wordcount程序编写.在数据量很小,作为demo程序跑,不会有什么问题,但当数据量激增,变成以亿计算时,各种问题都会出现. 所以一定要在main函数中,增加下面的代码,让程序去读取环境配置文件,得到你希望要的参数. Configuration.addDefaultResource("hdfs

Spark在Yarn上运行Wordcount程序

前提条件 1.CDH安装spark服务 2.下载IntelliJ IDEA编写WorkCount程序 3.上传到spark集群执行 一.下载IntellJ IDEA编写Java程序 1.下载IDEA 官网地址:http://www.jetbrains.com/idea/  下载IntlliJ IDEA后,进行安装. 2.新建Java项目 1.点击File 2.点击New Project 3.点击Java 注意:Project SDK要选择本机安装的JDK的位置,由于我的JDK是1.7,所以下面的

编写hadoop程序并打成jar包上传到hadoop集群运行

准备工作: 1. hadoop集群(我用的是hadoop-2.7.3版本),这里hadoop有两种:1是编译好的hadoop-2.7.3:2是源代码hadoop-2.7.3-src: 2. 自己的机器可以是任何系统,只要支持JVM,自己的主机上必须有eclipse,以及hadoop-2.7.3和hadoop-2.7.3-src.(我用的是windows系统,为了方便Linux系统传输数据,我选用了FileZilla,一款ftp工具,具体见www.cnblogs.com/NongSi-Net/p/

spark快速入门与WordCount程序机制深度解析 spark研习第二季

2.spark wordCount程序深度剖析 标签: spark 一.Eclipse(scala IDE)开发local和cluster (一). 配置开发环境 要在本地安装好java和scala. 由于spark1.6需要scala 2.10.X版本的.推荐 2.10.4,java版本最好是1.8.所以提前我们要需要安装好java和scala并在环境变量中配置好. 下载scala IDE for eclipse安装 连接:http://scala-ide.org/download/sdk.h