【2020/2/5】寒假自学——学习进度报告12

  今天主要完成了北京市政百姓信件分析实战。



  Spark方面只是安装了Flume,以及尝试使用套接字流作为DSteam的数据源。

  启动NetCat作为套接字的监听模式,这样在端口9999就能和spark互联。

  值得一提,nc -l 9999 虽然也是适用的,-k是为了可以保持多个连接,所以应该还是必要的。

  编写DSteam代码并作为接受数据的一方。

import findspark

findspark.init()

import sys

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    conf = SparkConf().set("spark.task.cpus", "2")
    sc = SparkContext(appName="PythonStreamingNetworkWordCount", master="spark://hadoop-master:7077", conf=conf)
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))         .map(lambda word: (word, 1))         .reduceByKey(lambda a, b: a + b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

  其中 lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 为使用了启动变量,所以在设置Parameters:

  值得一提,我在代码里面加了 conf = SparkConf().set("spark.task.cpus", "2") 这么一句话,是因为在我第一次运行的时候发现程序卡在了中途的阶段,因为任务需求多个核心来处理但我默认只设置了1颗核心,所以加上这句话保险。但这样做的同时需要设置worker端的核心数量(因为我只有一个worker),只需要在spark-env.sh中加入export SPARK_WORKER_CORES=4 就可以了,核心数量视情况可以提升或减少,但为了任务正常运行需要至少2。

  运行结果:

  

  可以看到程序每秒都会监听消息。

  另外明天尝试Flume作为数据源并且把结果保存到文件系统中。

原文地址:https://www.cnblogs.com/limitCM/p/12267110.html

时间: 2024-10-10 11:23:07

【2020/2/5】寒假自学——学习进度报告12的相关文章

【2020/2/2】寒假自学——学习进度报告9

因为想要通过hive作为数据库来保存爬取后和处理完成的数据,需要添加spark的hive支持,这方面还没编译完,所以今天暂时没有这方面的进度,所以写写SparkSteaming. 数据的价值随着时间的流逝而减少 这也正是MapReduce的使用范围所产生的的极大弊端,没法应对大流量的实时数据,MR这类离线处理并不能很好地解决问题. 流计算可以很好地对大规模流动数据在不断变化的运动过程中实时地进行分析,捕捉到可能有用的信息,并把结果发送到下一计算节点.而Spark中能很好地处理流计算的就是Spar

【2020/1/18】寒假自学——学习进度报告2

写博客是时隔两天,但学习并没有停止. 这一篇博客还是写一下关于Spark基础知识的,上次只是总体名词的理解. Spark的核心是建立在统一的抽象RDD之上,使得Spark的各个组件可以无缝进行集成,在同一个应用程序中完成大数据计算任务 于是RDD——由DAG图帮助形成的分布式内存的数据集帮助Spark达成了能比Hadoop快100倍的成就.每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算——这

【2020/1/25】寒假自学——学习进度报告6

这篇准备尝试RDD的编程操作. spark运行用户从文件系统中加载数据.通过并行集合(数组)创建RDD,两种都是很方便的操作方式. 应对实验,我在创建了一个文本文件.内容包括—— 之后就是尝试创建RDD. 在pyspark中使用—— >>> students=sc.textFile("file:///usr/local/spark/mycode/exp4/chapter5-data1.txt") 处理之后得到—— 然后就可以进行我们所需要的操作了,例如统计学生和统计课

【2020/1/18】寒假自学——学习进度报告3

紧接上一次. 这次是对于spark安装的总结. 首先便是下载spark. 从官网上可以找到用户提供Hadoop环境的安装包,另外值得一提的是用户也可以无需自己安装hadoop而是选择原装包括了hadoop的安装包. 放入虚拟机之后解压缩,修改权限,之后就可以开始配置了. 配置文件需要配置slaves(用于分布式配置,我只填入master的地址,所以也是伪分布吧)和spark-env.sh slaves文件设置Worker节点而spark-env.sh需要填入的内容为: export SPARK_

【2020/1/18】寒假自学——学习进度报告4

上次是安装完成了,这次就来试试Spark的基本操作. 首先是运行Spark自带的实例SparkPi. 在配置好环境变量的时候可以直接运行,但可以看到虽然运行成功但信息太过复杂,所以检索之后—— 虽然计算结果有所偏差,但多少能证明Spark的计算能力可以使用. 第二个运行的就是和计算能力没太大关联的WordCount. 创造好需要的文件(文件内存入了空格隔离的几个单词). 启动pyspark,其以交互的方式使用Python编写Spark程序. 可以看到启动界面如此. 但也有会遇到—— 的情况,这就

【2020/1/27】寒假自学——学习进度报告7

今天想记录下如何在windows环境下远程提交代码到spark集群上面运行. spark集群搭建环境使Linux系统,但说实在,Linux系统因为是虚拟机的缘故运行IDE并不是很舒服,想要对python进行舒适的编程操作还不是一件容易事,所以今天记录下如何在Windows下进行spark编程. 首先是spark的基本安装. 需要按照集群方式安装,同时虚拟机需要保证能和Windows互联互通(能ping通),这样才能有最基本的环境. 具体操作按照教程按照集群版的的spark即可,注意的是slave

【2020/2/13】寒假自学——学习进度报告16

电脑修好了,但之前落下的几天博客确实没啥补的劲头,今天先写一份报告吧. 实验内容和要求 安装 Flume 安装成功. 使用 Avro 数据源测试 Flume 配置文件 启动flume agent a1 创建指定的日志文件 客户端传输 传输完成 使用 netcat 数据源测试 Flume 创建example.conf 启动flume 启动Telnet Hellowworld 使用 Flume 作为 Spark Streaming 数据源 配置文件 拷贝依赖包 修改spark-env.sh添加环境变

【原】2014年5月学习进度报告

一.英语学习 1.使用"拓词"背托福核心词:6天 2.使用纸笔复习之前的单词:9天 一点体会:"拓词"软件确实是一款非常不错的背单词软件,通过让单词的适时反复出现,让大脑在被动的状态下识记单词.但是,长期使用后渐渐发现,能在软件里正确识记的单词却常常在日常场景中忘记词义,也就是单词重现太依赖于软件内部,于是从这个月开始,将重点放在巩固以前学习过的单词上,落实那些似认识又印象模糊的词汇. 二.阅读 <当我谈跑步时,我谈些什么>(村上春树第一本写自己的书,读

【原】2014年7月学习进度报告

1. 英语学习 每日听力30分钟(20天) YY英语角口语练习(10天) 2.阅读 <虚实之间>(充满正能量的一本书,有点巧合的是,刚刚读完这本书,芮成钢就出事了,但书的内容和观点都是很正向的.) <别告诉我你会记笔记>(一个日本人写的关于记笔记的方法论,日本人对于笔记的重视很早就见识过,也曾经给我带来很大的启发.笔记的作用常常被小视,只有真正长期坚持笔记的人才能体会到其中的好处.这是一本好书,整本书都是“干货”) <4点起床最养生和高效的时间管理>(也是一个日本人书,