kafka----kafka API(java版本)

Apache Kafka包含新的Java客户端,这些新的的客户端将取代现存的Scala客户端,但是为了兼容性,它们仍将存在一段时间。可以通过一些单独的jar包调用这些客户端,这些包的依赖性都比较小,同时老的Scala客户端仍会存在。

一、Producer   API

我们鼓励所有新开发都使用新的java版本producer。这个客户端是经过生产环境测试的,并且一般情况下会比先前的Scala客户端要更快而且具有更多的特性。你可以通过添加对客户端jar包的依赖来调用这个客户端,如下所示,使用maven配置:

<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.10.0.0</version>
	</dependency>
 
可以通过javadoc文件查看如何使用producer。
 
二、Consumer API
在0.9.0发布版本中,增加了新的java版本的consumer,用来替代已有的high-level的基于zookeeper的consumer,以及low-level的consumer APIs。
这个客户端认为是beta版本。为了保证用户获得平稳的升级,我们会继续维护0.8版本的consumer客户端,此版本客户端会在0.9版本的kafka集群中
依然生效。下面的章节中,我们会介绍老的0.8版本的consumer APIs(包括high-level的ConusmerConnector以及low-level SimpleConsumer)以及
新的Java版本的consumer API。
 
1、Old High  Level Consumer  API
class   Consumer{
/**
*  Create a ConsumerConnector:创建consumer connector
*
*  @param config at the minimum, need to specify the groupid of the consumer and the zookeeper connection string zookeeper.connect.config参数作用:需要置顶consumer的groupid以及zookeeper连接字符串zookeeper.connect
*/
 
public static kafka.javaapi.consumer.ConsumerConnector  createJavaConsumerConnector(ConsumerConfig  config);
}
 
/**
*  V: type of the message: 消息类型
*  K: type of the optional key assciated with the message: 消息携带的可选关键字类型
*/
public interface kafka.javaapi.consumer.ConsumerConnector {
/**
*  Create a list of message streams of type T for each topic.:为每个topic创建T类型的消息流的列表
*
*  @param topicCountMap a map of (topic, #streams) pair   : topic与streams的键值对
*  @param decoder a decoder that converts from Message to T  : 转换Message到T的解码器
*  @return  a map of (topic, list of KafakStream) pairs.   : topic与KafkaStream列表的键值对
*           The number of items in the list is #streams . Each stream supports
*           an iterator over message/metadata pairs .:列表中项目的数量是#streams。每个stream都支持基于message/metadata 对的迭代器
*/
public <K,V> Map<String, List<KafkaStream<K,V> > >
createMessageStreams( Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
 
 
/** * Create a list of message streams of type T for each topic, using the default decoder.为每个topic创建T类型的消息列表。使用默认解码器 */
 
public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);
/**
   *  Create a list of message streams for topics matching a wildcard.为匹配wildcard的topics创建消息流的列表
   *
   *  @param topicFilter a TopicFilter that specifies which topics to
   *                    subscribe to (encapsulates a whitelist or a blacklist).指定将要订阅的topics的TopicFilter(封装了whitelist或者黑名单)
   *  @param numStreams the number of message streams to return.将要返回的流的数量
   *  @param keyDecoder a decoder that decodes the message key  可以解码关键字key的解码器
   *  @param valueDecoder a decoder that decodes the message itself  可以解码消息本身的解码器
   *  @return a list of KafkaStream. Each stream supports an
   *          iterator over its MessageAndMetadata elements.  返回KafkaStream的列表。每个流都支持基于MessagesAndMetadata 元素的迭代器。
   */
 public <K,V> List<KafkaStream<K,V>>
    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);
 
/**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder.使用默认解码器,为匹配wildcard的topics创建消息流列表
   */  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

  /**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.使用默认解码器,为匹配wildcard的topics创建消息流列表
   */  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);

  /**
   *  Commit the offsets of all topic/partitions connected by this connector.通过connector提交所有topic/partitions的offsets
   */  public void commitOffsets();

  /**
   *  Shut down the connector: 关闭connector
   */  public void shutdown();
}
 
 

你可以根据这个例子学习怎样使用high level consumer api。

2、Old Simple Consumer  API
class kafka.javaapi.consumer.SimpleConsumer {
  /**
   *  Fetch a set of messages from a topic.从topis抓取消息序列
   *
   *  @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.指定topic 名字,topic partition,开始的字节offset,抓取的最大字节数
   *  @return a set of fetched messages
   */  public FetchResponse fetch(kafka.javaapi.FetchRequest request);

  /**
   *  Fetch metadata for a sequence of topics.抓取一系列topics的metadata
   *
   *  @param request specifies the versionId, clientId, sequence of topics.指定versionId,clientId,topics
   *  @return metadata for each topic in the request.返回此要求中每个topic的元素据
   */  public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);

  /**
   *  Get a list of valid offsets (up to maxSize) before the given time.在给定的时间内返回正确偏移的列表
   *
   *  @param request a [[kafka.javaapi.OffsetRequest]] object. 
   *  @return a [[kafka.javaapi.OffsetResponse]] object.
   */  public kafak.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);

  /**
   * Close the SimpleConsumer.关闭
   */  public void close();
}
 
对大多数应用来说, high  level consumer  Api已经足够了,一些应用要求的一些特征还没有出现high level consumer接口(例如,
当重启consumer时,设置初始offset)。他们可以使用low level SimpleConsumer  Api。逻辑可能会有些复杂,你可以根据这个例子学习一下。
 
3、New Consumer API
 
新consumer API统一了标准,原来存在于0.8版本的high-level以及low-level consumer APIs之间差异不存在了。你可以通过使用下面maven配置方式,
指明客户端依赖的jar包,这样就可以使用新的consumer API。
 
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>

Examples showing how to use the consumer are given in the javadocs.

 
三、Streams API
在0.10.0 release版本中,我们增加了新的客户端调用库Kafka Streams,用来支持流式处理应用。Kafka Streams库认为是
alpha版本质量的,同时它的公共调用APIs在将来有可能会修改。你可以像下面maven配置模式一样,指明Kafka Streams的
依赖关系来调用Kafka Streams。
 
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency>
在Javadocs中展示了如何调用这个库(注意这些类都是不稳定的,表明以后的版本中可能会修改)。

源码地址获取mingli

有兴趣的朋友们可以前往球球哦~一起分享学习技术:2042849237

时间: 2024-10-08 04:49:54

kafka----kafka API(java版本)的相关文章

微信公众号支付 js api java版本

说起来.微信支付真是一堆坑. 居然官网都没有java版本的完整代码. 就算是php版本的.还都有错误.且前后各种版本.各种文档一大堆....不停的误导开发人员. 花了一天半时间.总算实现了微信公众号支付.和pc端的微信扫码支付.其他不说了.直接给思路 本人做的是微信V3版本的微信支付.微信的官方文档中.提供的demo 只有一些工具类.这些类还是很有作用的. https://mp.weixin.qq.com/paymch/readtemplate?t=mp/business/course3_tmp

Apache Kafka系列(三) Java API使用

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 摘要: Apache Kafka Java Client API 一.基本概念 Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如: 1.创建Topic 2.罗列出已存在的Topic 3.对已有Topic的Produce/Consume测试

kafka学习之-java api测试

1.配置 package com.storm.storm_my.kafka; /** * * @author Peng.Li * */ public class KafkaConfigApiConstant { /** * * @author 配置的key * */ public interface kafkaPropertiesKeys { public static final String ZK_CONNECT = "zookeeper.connect"; public stat

Apache kafka客户端开发-java

apache kafka中国社区QQ群:162272557 1.依赖包 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.1</version> </dependency> 2.producer程序开发例子 2.1 producer参数说明 #指定kafka节点列表,

最好用的 Kafka Json Logger Java客户端,赶紧尝试一下

最好用的 Kafka Json Logger Java客户端. slf4j4json 最好用的 Kafka Json Logger 库:不尝试一下可惜了! Description 一款为 Kafka 提供的 json logger 客户端,支持将 json 格式的 log 输出到 kafka.文件.控制台. 支持 slf4j 的全部功能. 比 KafkaLog4jAppender 更好用,可配置性更好. 支持 close logger, 在程序关闭之前 flush log to kafka. 支

Java 连接Kafka报错java.nio.channels.ClosedChannelExcep

Java 客户端连接Kafka报如下错误 bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic [2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,po

[Big Data - Kafka] Kafka设计解析(四):Kafka Consumer解析

High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播).因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义. Consumer Group High Level Consumer将从某个Partition读取的最后一条消息的offset存

SparkSql官方文档中文翻译(java版本)

1 概述(Overview) 2 DataFrames 2.1 入口:SQLContext(Starting Point: SQLContext) 2.2 创建DataFrames(Creating DataFrames) 2.3 DataFrame操作(DataFrame Operations) 2.4 运行SQL查询程序(Running SQL Queries Programmatically) 2.5 DataFrames与RDDs的相互转换(Interoperating with RDD

centos 7 java版本切换

有一个项目需要java 1.7配合,原服务器上已安装java 1.8,需要切换版本到java 1.7. 查看centos7支持的java版本 yum search java|grep java-1 java-1.6.0-openjdk.x86_64 : OpenJDK Runtime Environment java-1.6.0-openjdk-demo.x86_64 : OpenJDK Demos java-1.6.0-openjdk-devel.x86_64 : OpenJDK Develo