python spark ML

window系统
1.
anaconda  或python
spark环境变量
2.
配置spark home
D:\Develop\spark-1.6.0-bin-hadoop2.6\spark-1.6.0-bin-hadoop2.6

3.
C:\Users\Administrator>pip install py4j

python for java  cpython c 与java交互就是通过py4j

pip uninstall py4j

4.
安装pyspark (不建议pip install ,) 为了版本对应,采用复制
D:\Develop\spark-1.6.0-bin-hadoop2.6\python\lib
py4j-0.9-src pyspark  复制到
D:\Develop\Python\Anaconda3\Lib\site-packages

C:\Users\Administrator>python
>>> import py4j
>>> import pyspark   ## 不报错,则安装成功

idea 版本python插件下载

 

eclipse scala IDE  安装pydev插件

python spark

环境描述
python 2.7.9
spark spark-1.6.0-bin-hadoop2.6
安装pyspark (不建议pip install ,) 为了版本对应,采用复制,注意解压文件夹名称可能有两层,脱去外层pyspark @@@@@@@
D:\Develop\spark-1.6.0-bin-hadoop2.6\python\lib
py4j-0.9-src pyspark  复制到
D:\Develop\Python\Anaconda3\Lib\site-packages

安装 pyDev
pycharm  配置成功。但是不能自动提示。

scala IDE 版本太低,官网下载最新的版本,eclispe marketplace 安装老版和新版都报错。

最后:参考bing 必应搜索,【how to install pydev on eclipse scala ide】
http://www.planetofbits.com/python/how-to-install-python-pydev-plugin-in-eclipse/
重新下载 eclipse ,下载 \PyDev 5.2.0 复制到eclipse dropins下。在eclispe marketplace中安装scala. ok.

 

eclipse 运行Python console 乱码(因为只支持gbk)

# coding:utf-8
‘‘‘
Created on 2019年10月3日

@author: Administrator

python wordcount

python print
‘‘‘
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

print "hello"
print("world")

def showResult(one):
    print(one)

if __name__ == ‘__main__‘:
    conf = SparkConf()
    conf.setMaster("local")
    conf.setAppName("test")
    sc=SparkContext(conf=conf)
    lines = sc.textFile("./words")
    words = lines.flatMap(lambda line:line.split(" "))
    pairWords = words.map(lambda word:(word,1))
    reduceResult=pairWords.reduceByKey(lambda v1,v2:v1+v2)
    reduceResult.foreach(lambda one:showResult(one))
hello spark
hello hdfs
hello python
hello scala
hello hbase
hello storm
hello python
hello scala
hello hbase
hello storm

  

## Demo2.py
# coding:utf-8
‘‘‘
Created on 2019年10月3日

@author: Administrator
‘‘‘
from os import sys
import random
if __name__ == ‘__main__‘:
    file = sys.argv[0] ## 本文件的路径
    outputPath = sys.argv[1]
    print("%s,%s"%(file,outputPath)) ## 真正的参数

    print(random.randint(0,255)) ## 包含0和255

pvuvdata

2019-10-01	192.168.112.101	uid123214	beijing	www.taobao.com	buy
2019-10-02	192.168.112.111	uid123223	beijing	www.jingdong.com	buy
2019-10-03	192.168.112.101	uid123214	beijing	www.tencent.com	login
2019-10-04	192.168.112.101	uid123214	shanghai	www.taobao.com	buy
2019-10-01	192.168.112.101	uid123214	guangdong	www.taobao.com	logout
2019-10-01	192.168.112.101	uid123214	shanghai	www.taobao.com	view
2019-10-02	192.168.112.111	uid123223	beijing	www.jingdong.com	comment
2019-10-03	192.168.112.101	uid123214	shanghai	www.tencent.com	login
2019-10-04	192.168.112.101	uid123214	beijing	www.xiaomi.com	buy
2019-10-01	192.168.112.101	uid123214	shanghai	www.huawei.com	buy
2019-10-03	192.168.112.101	uid123214	beijing	www.tencent.com	login
2019-10-04	192.168.112.101	uid123214	shanghai	www.taobao.com	buy
2019-10-01	192.168.112.101	uid123214	guangdong	www.taobao.com	logout
2019-10-01	192.168.112.101	uid123214	beijing	www.taobao.com	view
2019-10-02	192.168.112.111	uid123223	guangdong	www.jingdong.com	comment
2019-10-03	192.168.112.101	uid123214	beijing	www.tencent.com	login
2019-10-04	192.168.112.101	uid123214	guangdong	www.xiaomi.com	buy
2019-10-01	192.168.112.101	uid123214	beijing	www.huawei.com	buy	 

pvuv.py
# coding:utf-8
# import sys
# print(sys.getdefaultencoding()) ## ascii
# reload(sys)
# sys.setdefaultencoding("utf-8")  ## 2.x版本
# print(sys.getdefaultencoding())
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from cProfile import label
from com.sxt.spark.wordcount import showResult

‘‘‘
Created on 2019年10月3日

@author: Administrator
‘‘‘

‘‘‘
6.    PySpark统计PV,UV  部分代码
1). 统计PV,UV
2). 统计除了某个地区外的UV
3).统计每个网站最活跃的top2地区
4).统计每个网站最热门的操作
5).统计每个网站下最活跃的top3用户

‘‘‘

## 方法
def pv(lines):
    pairSite = lines.map(lambda line:(line.split("\t")[4],1))
    reduceResult = pairSite.reduceByKey(lambda v1,v2:v1+v2)
    result = reduceResult.sortBy(lambda tp:tp[1],ascending=False)
    result.foreach(lambda one:showResult(one))

def uv(lines):
    distinct = lines.map(lambda line:line.split("\t")[1] +‘_‘ + line.split("\t")[4]).distinct()
    reduceResult= distinct.map(lambda distinct:(distinct.split("_")[1],1)).reduceByKey(lambda v1,v2:v1+v2)
    result = reduceResult.sortBy(lambda tp:tp[1],ascending=False)
    result.foreach(lambda one:showResult(one))

def uvExceptBJ(lines):
    distinct = lines.filter(lambda line:line.split(‘\t‘)[3]<>‘beijing‘).map(lambda line:line.split("\t")[1] +‘_‘ + line.split("\t")[4]).distinct()
    reduceResult= distinct.map(lambda distinct:(distinct.split("_")[1],1)).reduceByKey(lambda v1,v2:v1+v2)
    result = reduceResult.sortBy(lambda tp:tp[1],ascending=False)
    result.foreach(lambda one:showResult(one))  

def getCurrentSiteTop2Location(one):
    site = one[0]
    locations = one[1]
    locationDict = {}
    for location in locations:
        if location in locationDict:
            locationDict[location] +=1
        else:
            locationDict[location] =1

    sortedList = sorted(locationDict.items(),key=lambda kv : kv[1],reverse=True)

    resultList = []
    if len(sortedList) < 2:
        resultList =  sortedList
    else:
        for i in range(2):
            resultList.append(sortedList[i])
    return site,resultList

def getTop2Location(line):
    site_locations = lines.map(lambda line:(line.split("\t")[4],line.split("\t")[3])).groupByKey()
    result = site_locations.map(lambda one:getCurrentSiteTop2Location(one)).collect()
    for elem in result:
        print(elem)

def getSiteInfo(one):
    userid = one[0]
    sites = one[1]
    dic = {}
    for site in sites:
        if site in dic:
            dic[site] +=1
        else:
            dic[site] = 1

    resultList = []
    for site,count in dic.items():
        resultList.append((site,(userid,count)))
    return resultList

‘‘‘
如下一片程序感觉有错,我写
‘‘‘
def getCurrectSiteTop3User(one):
    site = one[0]
    uid_c_tuples = one[1]

    top3List = ["","",""]
    for uid_count in uid_c_tuples:
        for i in range(len(top3List)):
            if top3List[i] == "":
                top3List[i] = uid_count
                break
            else:
                if uid_count[1] > top3List[i][1]:  ## 元组
                    for j in range(2,i,-1):
                        top3List[j] = top3List[j-1]
                    top3List[i] = uid_count
                break
    return site,top3List

‘‘‘
如下一片程序感觉有错,老师写
‘‘‘
def getCurSiteTop3User2(one):
    site = one[0]
    userid_count_Iterable = one[1]
    top3List = ["","",""]
    for userid_count in userid_count_Iterable:
        for i in range(0,len(top3List)):
            if top3List[i] == "":
                top3List[i] = userid_count
                break
            else:
                if userid_count[1]>top3List[i][1]:
                    for j in range(2,i,-1):
                        top3List[j] = top3List[j-1]
                    top3List[i] = userid_count
                break
    return site,top3List    

def getTop3User(lines):
    site_uid_count = lines.map(lambda line:(line.split(‘\t‘)[2],line.split("\t")[4])).groupByKey().flatMap(lambda one:getSiteInfo(one))
    result = site_uid_count.groupByKey().map(lambda one:getCurrectSiteTop3User(one)).collect()
    for ele in result:
        print(ele)

if __name__ == ‘__main__‘:
#     conf = SparkConf().setMaster("local").setAppName("test")
#     sc = SparkContext()
#     lines = sc.textFile("./pvuvdata")
# #     pv(lines)
# #     uv(lines)
# #     uvExceptBJ(lines)
# #     getTop2Location(lines)
#
#     getTop3User(lines)
    res = getCurrectSiteTop3User(("baidu",[(‘A‘,12),(‘B‘,5),(‘C‘,12),(‘D‘,1),(‘E‘,21),(‘F‘,20)]))
    print(res)
    res2 = getCurSiteTop3User2(("baidu",[(‘A‘,12),(‘B‘,5),(‘C‘,12),(‘D‘,1),(‘E‘,21),(‘F‘,20)]))
    print(res)

  

python pycharm anaconda 版本切换为3.5

  

 

 

  

  

 

  

  

 

原文地址:https://www.cnblogs.com/xhzd/p/11621172.html

时间: 2024-10-12 08:37:29

python spark ML的相关文章

Spark ML下实现的多分类adaboost+naivebayes算法在文本分类上的应用

1. Naive Bayes算法 朴素贝叶斯算法算是生成模型中一个最经典的分类算法之一了,常用的有Bernoulli和Multinomial两种.在文本分类上经常会用到这两种方法.在词袋模型中,对于一篇文档$d$中出现的词$w_0,w_1,...,w_n$, 这篇文章被分类为$c$的概率为$$p(c|w_0,w_1,...,w_n) = \frac{p(c,w_0,w_1,...,w_n)}{p(w_0,w_1,...,w_n)} = \frac{p(w_0,w_1,...,w_n|c)*p(c

基于Python Spark的大数据分析_pyspark实战项目课程

基于Python Spark的大数据分析(第一期) 课程介绍地址:http://www.xuetuwuyou.com/course/173 课程出自学途无忧网:http://www.xuetuwuyou.com 讲师:轩宇老师 1.开课时间:小班化教学授课,第一期开课时间为5月20号(满30人开班,先报先学!): 2.学习方式:在线直播,共8次课,每次2小时,每周2次(周三.六,晚上20:30 - 22:30),提供在线视频,课后反复学习: 3.报名课程后,请联系客服申请加入班级答疑交流QQ群:

spark ml 的例子

一.关于spark ml pipeline与机器学习 一个典型的机器学习构建包含若干个过程 1.源数据ETL 2.数据预处理 3.特征选取 4.模型训练与验证 以上四个步骤可以抽象为一个包括多个步骤的流水线式工作,从数据收集开始至输出我们需要的最终结果.因此,对以上多个步骤.进行抽象建模,简化为流水线式工作流程则存在着可行性,对利用spark进行机器学习的用户来说,流水线式机器学习比单个步骤独立建模更加高效.易用. 受 scikit-learn 项目的启发,并且总结了MLlib在处理复杂机器学习

Python Spark Tutorial

Python Spark tutorial one Python Spark tutorial two

Extending sparklyr to Compute Cost for K-means on YARN Cluster with Spark ML Library

Machine and statistical learning wizards are becoming more eager to perform analysis with Spark MLlibrary if this is only possible. It’s trendy, posh, spicy and gives the feeling of doing state of the art machine learning and being up to date with th

Spark ML Pipeline简介

Spark ML Pipeline基于DataFrame构建了一套High-level API,我们可以使用MLPipeline构建机器学习应用,它能够将一个机器学习应用的多个处理过程组织起来,通过在代码实现的级别管理好每一个处理步骤之间的先后运行关系,极大地简化了开发机器学习应用的难度.        Spark ML Pipeline使用DataFrame作为机器学习输入输出数据集的抽象.DataFrame来自Spark SQL,表示对数据集的一种特殊抽象,它也是Dataset(它是Spar

配置Ipython Nodebook 运行 Python Spark 程序

配置Ipython Nodebook 运行 Python Spark 程序 1.1.安装Anaconda Anaconda的官网是https://www.anaconda.com,下载对应的版本: 1.1.1.下载Anaconda $ cd /opt/local/src/ $ wget -c https://repo.anaconda.com/archive/Anaconda3-5.2.0-Linux-x86_64.sh 1.1.2.安装Anaconda # 参数 -b 表示 batch -p

Spark ML机器学习库评估指标示例

本文主要对 Spark ML库下模型评估指标的讲解,以下代码均以Jupyter Notebook进行讲解,Spark版本为2.4.5.模型评估指标位于包org.apache.spark.ml.evaluation下. 模型评估指标是指测试集的评估指标,而不是训练集的评估指标 1.回归评估指标 RegressionEvaluator Evaluator for regression, which expects two input columns: prediction and label. 评估

Spark ml pipeline - transforming feature - StringIndexer

在spark ml pipeline的特征提取和转换阶段,有一种transformer可以将机器学习训练数据中常见的字符串列(例如表示各种分类)转换为数值索引列,以便于计算机处理.它就是StringIndexer.它支持的索引范围为[0, numLabels)(不支持的会编码为numLabels),并且支持四种排序方式,frequencyDesc(频率最高的索引赋值为0),frequencyAsc,alphabetDesc,alphabetAsc. 假设我们有dataframe id | cat