(二)Kafka0.8.2官方文档中文版系列-API

2. API

我们正在为Kafka重写JVM客户端。在Kafka0.8.2中,包含一个新重写的Java producer。下一个版本将包含一个等效的Java consumer。这些新客户端旨在取代现有的Scala客户端,但为了兼容性,它们将共存一段时间。这些客户端可以在一个独立的jar中使用,并且具有最小的依赖性,而旧的Scala客户端仍然与服务器打包在一起。

2.1 Producer API

在kafka0.8.2版本中,我们鼓励你使用新的java producer。这个客户端经过生产环境的测试,相比之前的scala客户端该客户端更快、有更多的特性。你可以通过添加如下maven依赖使用它:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

javadoc中可以查看如何使用producer。

对于那些对遗留Scala生产者api感兴趣的人,可以在这里找到相关信息。

2.2  High Level Consumer API

class Consumer {
  /**
   *  Create a ConsumerConnector   创建一个消费者连接器
   *
   *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
   *                 connection string zookeeper.connect.  *  参数解释:基于一个最小的配置,你只需要指定消费者组,zookeeper的连接
   */
  public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
}

/**
 *  V: type of the message    消息的类型
 *  K: type of the optional key assciated with the message  与消息相关的可选的配置
 */
public interface kafka.javaapi.consumer.ConsumerConnector {
  /**
   *  Create a list of message streams of type T for each topic.
   *  为每个topic创建一个T类型的消息流列表   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @param decoder a decoder that converts from Message to T
   *  @return a map of (topic, list of  KafkaStream) pairs.
   *          The number of items in the list is #streams. Each stream supports
   *          an iterator over message/metadata pairs.
   */
  public <K,V> Map<String, List<KafkaStream<K,V>>>
    createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

  /**
   *  Create a list of message streams of type T for each topic, using the default decoder.  *  为每个topic创建一个T类型的消息流列表,使用默认的解码器
   */
  public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);

  /**
   *  Create a list of message streams for topics matching a wildcard.
   *
   *  @param topicFilter a TopicFilter that specifies which topics to
   *                    subscribe to (encapsulates a whitelist or a blacklist).
   *  @param numStreams the number of message streams to return.
   *  @param keyDecoder a decoder that decodes the message key
   *  @param valueDecoder a decoder that decodes the message itself
   *  @return a list of KafkaStream. Each stream supports an
   *          iterator over its MessageAndMetadata elements.
   */
  public <K,V> List<KafkaStream<K,V>>
    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

  /**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder.   *  为与通配符匹配的消息流创建一个消息流列表,使用默认的解码器
   */
  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

  /**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.   *  为与通配符匹配的消息流创建一个消息流列表,使用默认的解码器
   */
  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);

  /**
   *  Commit the offsets of all topic/partitions connected by this connector.   *  提交通过该连接器关联的所有的topic和分区的偏移量
   */
  public void commitOffsets();

  /**
   *  Shut down the connector 关闭连接器
   */
  public void shutdown();
}

你可以参考这个示例去学习如何使用high level 消费者API。

2.3  Simple Consumer API

class kafka.javaapi.consumer.SimpleConsumer {
  /**
   *  Fetch a set of messages from a topic.   *  从一个topic拉取消息
   *
   *  @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.   *         请求需要指定主题的名称、主题分区、起始偏移量、拉取数据的最大字节数
   *  @return a set of fetched messages    *         拉取回来的消息集合
   */
  public FetchResponse fetch(kafka.javaapi.FetchRequest request);

  /**
   *  Fetch metadata for a sequence of topics.   *  获取一系列主题的元数据
   *
   *  @param request specifies the versionId, clientId, sequence of topics. 需要指定版本号、客户端ID、主题
   *  @return metadata for each topic in the request.  每个请求主题的元数据
   */
  public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);

  /**
   *  Get a list of valid offsets (up to maxSize) before the given time.   *  在给定的时间之前,得到一个有效的偏移量列表(偏移量可以取到给定时间之前的最大值)
   *
   *  @param request a [[kafka.javaapi.OffsetRequest]] object.
   *  @return a [[kafka.javaapi.OffsetResponse]] object.
   */
  public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);

  /**
   * Close the SimpleConsumer.   关闭SimpleConsumer
   */
  public void close();
}

对大多数应用来说,the high level API是完全够用的。一些应用想要使用一些high level API没有暴露的特性(比如说,当重启消费者时指定初始的offset,即偏移量)。他们可以使用我们的low level

SimpleConsumer API。但是这个逻辑会有点复杂,你可以参考这个例子

2.4 Kafka Hadoop Consumer API

我们的一个基本用例就是,为数据聚合和加载数据到hadoop提供一个水平扩展的解决方案。为了支持这个用户用例,我们提供了一个基于hadoop的消费者,它生成了许多map任务,以并行地从Kafka集群中拉取数据。这可以非常快速的将kafka的数据加载到hadoop中(我们只用了一些Kafka服务器就完全饱和了网络,意思就是基于hadoop的consumer拉取速度很快)。

使用hadoop consumer的信息,可以在这里找到。

原文地址:https://www.cnblogs.com/dreamfor123/p/9392521.html

时间: 2024-10-11 20:53:26

(二)Kafka0.8.2官方文档中文版系列-API的相关文章

(三)Kafka0.8.2官方文档中文版系列-topic配置参数

前文链接: (一)Kafka0.8.2官方文档中文版系列-入门指南 (二)Kafka0.8.2官方文档中文版系列-API Topic-level configuration(主题级别的参数配置) 与主题相关的配置具有全局默认值(参考broker部分)和每个主题可选重写(broker部分有明确提示).如果主题没有重写这些配置,使用全局默认设置.可以使用--config添加一个或者多个自定义选项.下面这个例子创建了一个名为my-topic的主题,它自定义了最大消息大小和刷新速率: > bin/kaf

(一)Kafka0.8.2官方文档中文版系列-入门指南

写在前面的话 本系列文章仅仅代表个人的观点,结合自己的学习.使用经验,将kafka0.8.2官方文档,进行翻译,目录结构按照官方文档进行排版. 目的: 系统梳理下kafka知识点,从整体上重新认识下kafka 与广大网友进行交流,内容中难免有不合适的地方,还请大家不吝赐教,我会及时更正 尽一点点微薄之力,去帮助一些人,大家共同进步 一.Getting Started 1.1  Introduction(简介) Kafka是一个分布式.分区的.数据备份的日志收集系统.Kafka使用了一种的独特的方

(四)Kafka0.8.2官方文档中文版系列-消息传递语义

4.6 Message Delivery Semantic(消息传递语义) 现在我们了解了生产者和消费者的工作方式,让我们讨论Kafka在生产者和消费者之间提供的语义保证.显然,可以提供多种可能的消息传递保证: 最多一次 -消息可能会丢失,但永远不会被重新发送. 至少一次 -消息永远不会丢失,但可能会被重新发送. 恰好一次 - 这是人们真正想要的,每条消息只发送一次. 值得注意的是,这会分解为两个问题:发布消息的持久性保证以及消费消息时的保证. 许多系统声称提供"恰好一次"的交付语义,

2DToolkit官方文档中文版打地鼠教程(二):设置摄像机

这是2DToolkit官方文档中 Whack a Mole 打地鼠教程的译文,为了减少文中过多重复操作的翻译,以及一些无必要的句子,这里我假设你有Unity的基础知识(例如了解如何新建Sprite等).当前2D Toolkit版本为2.4. 这是一篇系列教程,全文共13节(官方文档为4章,不过为了每节有明确目的,我根据官方文档的标题拆成了13节),下面是本系列教程的所有链接: 2DToolkit官方文档中文版打地鼠教程(一):初始设置 2DToolkit官方文档中文版打地鼠教程(二):设置摄像机

2DToolkit官方文档中文版打地鼠教程(三):Sprite Collections 精灵集合

这是2DToolkit官方文档中 Whack a Mole 打地鼠教程的译文,为了减少文中过多重复操作的翻译,以及一些无必要的句子,这里我假设你有Unity的基础知识(例如了解如何新建Sprite等).当前2D Toolkit版本为2.4. 这是一篇系列教程,全文共13节(官方文档为4章,不过为了每节有明确目的,我根据官方文档的标题拆成了13节),下面是本系列教程的所有链接: 2DToolkit官方文档中文版打地鼠教程(一):初始设置 2DToolkit官方文档中文版打地鼠教程(二):设置摄像机

2DToolkit官方文档中文版打地鼠教程(一):初始设置

这是2DToolkit官方文档中 Whack a Mole 打地鼠教程的译文,为了减少文中过多重复操作的翻译,以及一些无必要的句子,这里我假设你有Unity的基础知识(例如了解如何新建Sprite等).当前2D Toolkit版本为2.4. 这是一篇系列教程,全文共13节(官方文档为4章,不过为了每节有明确目的,我根据官方文档的标题拆成了13节),下面是本系列教程的所有链接: 2DToolkit官方文档中文版打地鼠教程(一):初始设置 2DToolkit官方文档中文版打地鼠教程(二):设置摄像机

【译】StackExchange.Redis官方文档-中文版(序)

StackExchange.Redis官方文档-中文版(序) Intro 最近想深入学习一些 Redis 相关的东西.于是看了看官方的项目,发现里面有一份文档,于是打算翻译成中文,方便大家学习参考,如果有什么翻译不准确的地方,欢迎大家指出. 翻译进度 文档还有一部分还未翻译完,我会争取在三月中旬前将剩下的部分翻译结束. 翻译进度详见:https://github.com/WeihanLi/StackExchange.Redis-docs-cn 文档地址: 原文文档地址: https://gith

TensorFlow 官方文档中文版

http://wiki.jikexueyuan.com/list/deep-learning/ TensorFlow 官方文档中文版 你正在阅读的项目可能会比 Android 系统更加深远地影响着世界! 缘起 2015年11月9日,Google发布人工智能系统TensorFlow并宣布开源,同日,极客学院组织在线TensorFlow中文文档翻译. 机器学习作为人工智能的一种类型,可以让软件根据大量的数据来对未来的情况进行阐述或预判.如今,领先的科技巨头无不在机器学习下予以极大投入.Faceboo

TestNG官方文档中文版(2)-annotation(转)

1. 介绍    TestNG是一个设计用来简化广泛的测试需求的测试框架,从单元测试(隔离测试一个类)到集成测试(测试由有多个类多个包甚至多个外部框架组成的整个系统,例如运用服务器). 编写一个测试的过程有三个典型步骤: * 编写测试的 业务逻辑并在代码中插入TestNG annotation    * 将测试信息添加到testng.xml文件或者build.xml中    * 运行TestNG 在欢迎页面上可以找到快速入门示例. 下面是这篇文档使用的概念: * suite由xml文件描述.它包