Beam从零开始(一)

网上看了别人都在谈Beam,你说咱们作为技术人员技术也得紧跟着时代不是,所以也开始利用业余时间研究Beam。咱不是大神,不能啥都一看就会,所以一天一天来,这个也就作为笔记吧。废话不多说,进入主题,按照老规矩,从官网入手。

其实Beam官网目前做的不是很丰满,不过好在按照步骤进行,可以接受。

Beam是什么呢?英文中Beam是光束的意思,官方对Beam的解释是:Apache Beam是一个开源的统一的编程模型(记住,他是个模型而已),我们可以使用它来创建数据处理管道(核心是管道)。我们首先要定义一个程序,使用开源的BeamSDK来定义管道。然后管道由Beam支持的分布式处理后端之一执行:Apache
Apex,Apache Flink,Apache Spark,Google Cloud Dataflow。

Beam对于尴尬并行数据处理任务特别有用,其中问题可以被分解为可以独立和并行处理的许多较小的数据束。我们同样可以使用Beam来提取,变换和加载(ETL)任务和纯数据集成。这些任务对于不同存储介质和数据源之间移动数据,将数据转化成理想格式,或将数据加载到新的系统上有很大的好处。

Beam管道运行器将我们定义的处理管道和程序转化为与我们选择的分布式处理后端兼容的API。当我们运行Beam程序的时候,我们需要为执行管道的后端指定适当的运行器(Runner)。

好了,上面就是简短的理论基础,下面开始我们经典的wordcount环节。不过我打算绕过官方的QuickStart环节,因为这个真的没啥意思,我们直接自己手动创建项目然后开始学习。

我们从Minimal WordCount开始说起,下面我简称:MW。MW演示了一个可以从文本中读取的管道。应用转换来对单词进行标记和计数,并将结果写入到输出文件中。下面是详细步骤:

首先我们创建一个maven项目,如图:

然后在pom文件中加入我们的依赖:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>0.4.0</version>
</dependency>

接着创建我们的第一个类:Day01,然后在其中创建main方法,那么到此我们的准备工作完毕。下面开始编写代码:

Creating the Pipeline

创建Beam管道的第一步是创建一个PipelineOptions对象,这个对象让我们对我们的管道设置各种选项,例如将要执行我们管道的管道线程以及所选择的线程所需的任何指定配置。我们可以为我们的Pipeline指定一个Runner。比如DataflowRunner或者SparkRunner。当我们不指定的时候,将会默认调用本地的DirectRunner。这里我跟官网不同,我使用最为简单的本地读取。

所以我可以直接创建Pipeline对象:

PipelineOptions pipe = PipelineOptionsFactory.create();
// 当我们不指定的时候,会默认使用DirectRunner这种类型
//  pipe.setRunner(DirectRunner.class);

Pipeline p = Pipeline.create(pipe);

【注意】如果这里直接运行会报错,说本地找不到DirectRunner类(能导入的那个不是我们需要的),因为缺少依赖,在pom中增加:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>0.4.0</version>
</dependency>

就可以成功解决问题。创建了管道,我们就可以对管道进行转化了。

每个转换采取某种输入,然后产生一些输出数据。输入和输出数据被SDK类:PCollection所表示。PCollection是一个特殊的类,他由Beam的SDK提供,我们可以用来代表几乎任何大小的数据集。流程图如下:

文本文件读取操作被用于Pipeline本身,他生成PCollection作为输出,输出PCollection中的每个元素表示输入文件中的一行文本。那么我们首先创建一个文件:

demo.txt为我们新创建的文件,里面内容:

tom
cat
hello

然后我们开始进行读取:

p.apply(TextIO.Read.from("D:\\JavaProject\\Beam_Demo\\src\\main\\resources\\demo.txt"))

读取完毕我们需要对内容进行处理,在每个元素上调用DoFn方法的ParDo转换,将文本行标记为单个单词,该文本的输入是由前一个TextIO.Read转换生成的文本行的PCollection。ParDo同样转换输出为新的PCollection,其中每个元素表示文本中的单个词:

.apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        for (String word : c.element().split(" ")) {
            if (!word.isEmpty()) {
                c.output(word);
            }
        }
    }
}))

接下来我们需要对每个单词进行统计,SDK提供的count变换是一种通用的转换,它采用任何类型的PCollection,并返回key/value类型的PCollection。每个key表示来自于集合的唯一元素,每个value表示key出现的总次数。

.apply(Count.<String>perElement())

下面的转换将唯一word和出现次数的每个key/value对格式化为适用于写入输出文件的可打印字符串。MapElements是一个更高级别的复合变换,它封装了一个简单的ParDo。对于PCollection中的每个元素,MapElements应用只产生一个元素的函数。在本例中,MapElements调用执行格式化的simpleFunction(匿名内部类),作为输入,

MapElements获得由count生成的key/value对的PCollection。并产生可打印字符串的新PCollection。

.apply("FormatResult", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
    @Override
    public String apply(KV<String, Long> input) {
        return input.getKey() + ": " + input.getValue();
    }
})).apply(TextIO.Write.to("D:\\JavaProject\\Beam_Demo\\src\\main\\resources\\wordcounts"));

然后我们开始运行:

p.run().waitUntilFinish();

运行后我们得到结果:

每个里面是word的统计结果,应该是hash分区,所以出现三个文件:

tom: 1
hello: 1
cat: 1

那么计算结束,这里仅仅是一个简单的入门,后面还会继续深入。

感谢开源。

时间: 2024-10-12 19:03:47

Beam从零开始(一)的相关文章

CSDN日报20170327——《写在阿里游戏的最后几天》

[程序人生]写在阿里游戏的最后几天 作者:leon 即将离开阿里游戏,在离开之前,写点什么,一是理一下这一年多在阿里游戏的经历在我的人生路上处于什么样的位置,一是作为一个阿里游戏人试图分析一下阿里和游戏的关系,三是作为一个在一线游戏研发从业 6 年的老人如果能为后来的游戏人带来点点参考那更好. [机器学习]机器学习入门系列02,Regression 回归:案例研究 作者:YoferZhang 没有比较好的数学基础,直接接触深度学习会非常抽象,所以这里我们先通过一个预测 Pokemon Go 的

iOS block从零开始

iOS block从零开始 在iOS4.0之后,block横空出世,它本身封装了一段代码并将这段代码当做变量,通过block()的方式进行回调. block的结构 先来一段简单的代码看看: void (^myBlock)(int a) = ^(int a){ NSLog(@"%zd",a); }; NSLog(@"旭宝爱吃鱼"); myBlock(999); 输出结果: 2016-05-03 11:27:18.571 block[5340:706252] 旭宝爱吃鱼

从零开始学android&lt;android事件的处理方式.二十四.&gt;

在android中一共有 多种事件,每种事件都有自己相对应的处理机制 如以下几种 1 单击事件 View.OnClickListener public abstract void onClick (View v) 单击组件时触发 2 单击事件 View.OnLongClickListener public abstract boolean onLongClick (View v) 长按组件时触发 3 键盘事件 View.OnKeyListener public abstract boolean

从零开始编写自己的C#框架(9)——数据库设计与创建

对于千万级与百万级数据库设计是有所区别的,由于本项目是基于中小型软件开发框架来设计,记录量相对会比较少,所以数据库设计时考虑的角度是:与开发相结合:空间换性能:空间换开发效率:减少null异常......当然不同的公司与项目要求不同,初学者要学会适应不同的项目开发要求,使用本框架开发时,必须严格按照本章节的要求来设计数据库,不然可能会产生不可控的异常. 从零开始编写自己的C#框架 数据库设计规范   文件状态: [√] 草稿 [  ] 正式发布 [  ] 正在修改 文件标识: C#框架 当前版本

Linux运维--从零开始

闲来无事,想写点东西.一来作为分享,二来也作为记录方便日后查看. 我会把我所学到的有关Linux运维的所有知识,进行分享. 博主性格比较随性,不能保证多久更新,也许一天一篇,也许一周,目标是一周3篇. 不敢保证所有东西都是对的,也希望更多的大牛指导交流. 既然从零开始,那就简单说下什么是Linux? Linux是一个操作系统,和你现在使用的Windows或MacOS一样,是一个人与机器沟通的一个"桥梁". Linux有很多发行版,常见的有Ubuntu.Debian.RHEL.Cento

从零开始:微信小程序新手入门宝典

为了方便大家了解并入门微信小程序,特将可能会需要的知识,列在这里,让大家方便的从零开始学习 一:微信小程序的特点 张小龙:张小龙全面阐述小程序,推荐通读此文: 小程序是一种不需要下载.安装即可使用的应用,它出现了触手可及的梦想,用户扫一扫或者搜一下即开打开应用,也出现了用完即走的理念,用户不用关心安装太多应用的问题,应用随处可用,但又无须安装卸载.我当时是这样来定义什么是小程序的. 1:无需下载:我们直接使用它,所以无须安装是小程序最基础的一个特性: 2:触手可及:当我们拿着智能手机接触周边的时

IC卡解密从零开始学1 (也许会有2) 解密工具V2 V3大放送 By:lookyour

前段时间发了一个破解的PN532工具,详见 ===========================IC卡解密工具 PN532工具XP 爆破版http://www.52pojie.cn/thread-597896-1-1.html IC卡解密从零开始学2  解密工具PN532-mfoc-mfcuk-GUIhttp://www.52pojie.cn/thread-604402-1-1.html =========================有很多人私信和回复希望有个详细点的介绍... 好吧,本着

SD从零开始41-44

[原创] SD从零开始41 科目确定(Account determination) 使用科目确定Using Account Determination 你将需要在几个不同的领域确定将要记账的科目: 用于记账销售收入,销售扣除和增值税的总账科目在数据从billing document传输到FI时自动地确定: 当处理现金销售时,必须在凭证中设置一个总账科目用于现金结算(不会记账到客户账户): 到4.0版本时,可以确定一个不同于付款方客户主数据中输入的科目的统御科目: 当使用payment cards

SD从零开始38-40

[原创]SD从零开始38 创建Billing Document 根据需要BillingBilling On Request 你可以通过手工输入凭证的号码(订单号码和Delivery note,依赖于你想要执行订单相关的还是交货相关的Billing)明确地指定哪些交易将要Billing: 如果Billing无法产生,例如因为一个billing block,系统会发行一个错误日志: 到4.5版本,在为deliveries或ordersBilling时你还可以选择单个的items或者items的部分数