【Samza系列】实时计算Samza中文教程(四)—API概述

上一篇和大家一起宏观上学习了Samza平台的架构,重点讲了一下数据缓冲层和资源管理层,剩下的一块很重要的SamzaAPI层本节作为重点为大家展开介绍。

当你使用Samza来实现一个数据流处理逻辑时,你必须实现一个叫StreamTask的接口,如下所示:

public class MyTaskClass implements StreamTask {

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    // process message
  }
}

当你运行你的job时,Samza将为你的class创建一些实例(可能在多台机器上)。这些任务实例会处理输入流里的消息。

在你的job的配置中你能告诉Samza你想消费哪条数据流。举一个较为完整的例子(大家也可以参看http://samza.incubator.apache.org/learn/documentation/0.7.0/jobs/configuration.html

):

# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass

# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent

# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json

对于Samza从任务的输入流利接收的每一条消息,处理逻辑都会被调用。它主要包含三个重要的信息:消息、关键词key以及消息来自的数据流:

/** Every message that is delivered to a StreamTask is wrapped
 * in an IncomingMessageEnvelope, which contains metadata about
 * the origin of the message. */
public class IncomingMessageEnvelope {
  /** A deserialized message. */
  Object getMessage() { ... }

  /** A deserialized key. */
  Object getKey() { ... }

  /** The stream and partition that this message came from. */
  SystemStreamPartition getSystemStreamPartition() { ... }
}

注意键和值都要被声明为对象,并且需要转化为正确的类型。如果你不配置一个serializer/deserializer,它们就会成为典型的java字节数组。一个deserializer能够转化这些字节到其他任意类型,举个例子来说j一个son deserializer能够将字节数组转化为Map、List以及字符串对象。

SystemStreamPartition()这个方法会返回一个SystemStreamPartition对象,它会告诉你消息是从哪里来的。它由以下三部分组成:

1. The system:系统的名字来源于消息,就在你job的配置里定义。你可以有多个用于输入和输出的不同名字的系统;

2. The stream name: 在原系统里数据流(话题、队列)的名字。同样也是在job的配置里定义;

3. The partition: 一条数据流通常会被划分到多个分区,并且每一个分区会被Samza安排一个StreamTask实例;

API看起来像是这样的:

/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream {

  /** The name of the system which provides this stream. It is
      defined in the Samza job‘s configuration. */
  public String getSystem() { ... }

  /** The name of the stream/topic/queue within the system. */
  public String getStream() { ... }

  /** The partition within the stream. */
  public Partition getPartition() { ... }
}

在上面这个job的配置例子里可以看到,这个系统名字叫“Kafka”,数据流的名字叫“PageViewEvent”。(kafka这个名字不是特定的——你能给你的系统取任何你想要的名字)。如果你有一些输入流向导入你的StreamTask,你能够使用SystemStreamPartition去决定你接受到哪一类消息。

如何发送消息呢?如果你看一下StreamTask里的process()方法,你将看到你有一个MessageCollector接口。

/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
  void send(OutgoingMessageEnvelope envelope);
}

为了发送一个消息, 你会创建一个OutgoingMessageEnvelop对象并且把它传递给消息收集器。它至少会确定你想要发送的消息、系统以及数据流名字再发送出去。你也可以确定分区的key和另一些参数。具体可以参考javadoc(http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html)。

 注意事项:

请只在process()方法里使用MessageCollector对象。如果你保持住一个MessageCollector实例并且之后再次使用它,你的消息可能会错误地发送出去。举一个例子,这儿有一个简单的任务,它把每个输入的消息拆成单词,并且发送每一个单词作为一个消息:

public class SplitStringIntoWords implements StreamTask {

  // Send outgoing messages to a stream called "words"
  // in the "kafka" system.
  private final SystemStream OUTPUT_STREAM =
    new SystemStream("kafka", "words");

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    String message = (String) envelope.getMessage();

    for (String word : message.split(" ")) {
      // Use the word as the key, and 1 as the value.
      // A second task can add the 1‘s to get the word count.
      collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
    }
  }
}

Samza的API的概要介绍就到这里吧,很多细节的API可以参看javadoc文档,这也是官网下一节的内容,由于篇幅有限,大家可以自己针对性的去深入了解了解就可以了。下一篇会讲一下之前在架构篇里多次提到的SamzaContainer。

时间: 2024-08-02 07:34:08

【Samza系列】实时计算Samza中文教程(四)—API概述的相关文章

Swift中文教程(四)--函数与闭包

原文:Swift中文教程(四)--函数与闭包 Function 函数 Swift使用func关键字来声明变量,函数通过函数名加小括号内的参数列表来调用.使用->来区分参数名和返回值的类型: 1 func greet(name: String, day: String) -> String { 2 return "Hello \(name), today is \(day)." 3 } 4 greet("Bob", "Tuesday")

【Samza系列】实时计算Samza中文教程(三)——架构

本篇紧接着概念篇,从宏观角度上看一下Samza实时计算服务的架构是什么样的? Samza是由以下三层构成: 1. 数据流层(A streaming layer) 2. 执行层(An execution layer) 3. 处理层(A progressing layer) 那Samza是依靠哪些技术完成以上三层的组合呢?如下图所示: 1. 数据流:分布式消息中间件Kafka 2. 执行:Hadoop资源调度管理系统YARN 3. 处理: Samza API 玩过大数据Hadoop的人可以类比下面的

实时计算Samza中文教程(二)——概念

希望上一篇背景篇让大家对流式计算有了宏观的认识,本篇根据官网是介绍概念,先让我们看看有哪些东西呢? 概念一:Streams Samza是处理流的.流则是由一系列不可变的一种相似类型的消息组成.举个例子,一个流可能是在一个网站上的所有点击,或者更新到一个特定数据库表的更新操作,或者是被一个服务或者事件数据生成所有日志信息.消息能够被加到另一个流之后或者从一个流中读取.一个流能有多个消费者,并且从一个流中读取不会删除消息(使得小心能够被广播给所有消费者).另外消息可以有一个关联的key用来做分区,这

实时计算Samza中文教程(一)背景

大家应该听我在前言篇里扯皮后,迫不及待要来一看Samza究竟是何物了吧?先了解一下Samza的Background是必不可少的(至少官网上是放在第一个的),我们需要从哪些技术背景去了解呢? 什么是消息(Messaging)? 消息系统是一种实现近实时异步计算的流行方案.消息产生时可以被放入一个消息队列(ActiveMQ,RabbitMQ).发布-订阅系统(Kestrel,Kafka)或者日志聚合系统(Flume.Scribe).下游消费者从上述系统读取消息并且处理它们或者基于消息的内容产生进一步

Netty4.x中文教程系列(四) 对象传输

Netty4.x中文教程系列(四)  对象传输 我们在使用netty的过程中肯定会遇到传输对象的情况,Netty4通过ObjectEncoder和ObjectDecoder来支持. 首先我们定义一个User对象,一定要实现Serializable接口: package mjorcen.netty.object; import java.io.Serializable; /** * User: hupeng Date: 14-6-3 Time: 上午1:31 */ public class Use

Netty4.x中文教程系列(六) 从头开始Bootstrap

Netty4.x中文教程系列(六) 从头开始Bootstrap 其实自从中文教程系列(五)一直不知道自己到底想些什么.加上忙着工作上出现了一些问题.本来想就这么放弃维护了.没想到有朋友和我说百度搜索推荐了我的文章.瞬间有点小激动啊.决定自己要把这个教程系列完善下去.这里诚挚的想支持我的盆友们道歉.真的是让你们失望了.我居然有想放弃的这种丧心病狂的念头.以后绝对不会了. 其实伴随着对Netty的逐步深入学习.感觉自己对netty的了解仍然有所欠缺.加上笔者语文课是美术老师教的.所以..说多了都是泪

Netty4.x中文教程系列(三) ChannelHandler

Netty4.x中文教程系列(四)  ChannelHandler 上一篇文章详细解释了Hello World示例的代码.里面涉及了一些Netty框架的基础. 这篇文章用以解释ChannelHandler.笔者本身在以前写过文章ChannelHandler改动及影响 和 ChannelInitializer 学习 对Netty的.ChannelHandler做过阐述.里面主要描述了4.x版本相对于3.x版本的改动以及影响.并引用了一些文章.为大家详细的解释了ChannelHandler里面涉及架

struts2官方 中文教程 系列十一:使用XML进行表单验证

在本教程中,我们将讨论如何使用Struts 2的XML验证方法来验证表单字段中用户的输入.在前面的教程中,我们讨论了在Action类中使用validate方法验证用户的输入.使用单独的XML验证文件让您可以内置到Struts 2框架的验证器. 贴个本帖的地址,以免被爬:struts2官方 中文教程 系列十一:使用XML进行表单验证  即 http://www.cnblogs.com/linghaoxinpian/p/6938720.html 下载本章节代码 为了使用户能够编辑存储在Person对

struts2官方 中文教程 系列九:Debugging Struts

介绍 在Struts 2 web应用程序的开发过程中,您可能希望查看由Struts 2框架管理的信息.本教程将介绍两种工具,您可以使用它们来查看.一个工具是Struts 2的配置插件,另一个是调试拦截器.本文还讨论了如何设置日志级别以查看更多或更少的日志消息. 贴个本帖的地址,以免被爬:struts2官方 中文教程 系列九:Debugging Struts 即 http://www.cnblogs.com/linghaoxinpian/p/6916619.html 下载本章节代码 Configu