Kafka实战系列--Kafka API使用体验

前言:
  kafka是linkedin开源的消息队列, 淘宝的metaq就是基于kafka而研发. 而消息队列作为一个分布式组件, 在服务解耦/异步化, 扮演非常重要的角色. 本系列主要研究kafka的思想和使用, 本文主要讲解kafka的一些基本概念和api的使用.

*) 准备工作
1) 配置maven依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.9.2</artifactId>
  <version>0.8.1.1</version>
</dependency>

2).配置hosts
vim /etc/hosts
把kafka集群相关的ip及其hostname, 配置到kafka客户端的本地机器

*) Kafka的基础知识
1). Broker, Zookeeper, Producer, Consumer
Broker具体承担消息存储转发工作, Zookeeper则用与元信息的存储(topic的定义/消费进度), Producer则是消息的生产者, Consumer则是消息的消费者.

2). Topic, Partition, Replication, Consumer Group
  Topic对应一个具体的队列, 在Kafka的概念中, 一个应用一个队列. 应用数据往往呈现部分有序的特点, 因此对kafka的队列, 引入partition的概念, 即可topic划分为多个partition. 单个Partition内保证有序, Partition间不保证. 这样作的好处, 是充分利用了集群的能力, 均匀负载和提高性能.
  Replication主要为了高可用性, 保证部分节点失效的恶劣情况下, 队列数据能不丢.
  Consumer Group的概念的引入, 很有创新性, 把以往传统队列(topic模式, queue模式)的属性从队列本身挪到了消费端. 若要使用queue模式, 则所有的消费端都采用统一个consumer group, 若采用topic模式, 则所有的客户端都设置为不同的consumer group. 其partition的消费进度在zookeeper有所保存.

*) Kafka API的简单样列代码

1). 生产者代码
分区类代码片段

public class SimplePartitioner implements Partitioner {
  public SimplePartitioner (VerifiableProperties props) {
  }
  public int partition(Object key, int numPartitions) {
    return (key.hashCode() & 0x0FFFFFFF) % numPartitions;
  }
}

评注: SimplePartitioner用于对消息进行分发到具体的partition中, 有消息的key来决定, 这个有点像map/reduce中的partition机制.

生产者代码片段

Properties props = new Properties();
// 配置metadata.broker.list, 为了高可用, 最好配两个broker实例
props.put("metadata.broker.list", "127.0.0.1:9092");
// serializer.class为消息的序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 设置Partition类, 对队列进行合理的划分
props.put("partitioner.class", "mmxf.kafka.practise.SimplePartitioner");
// ACK机制, 消息发送需要kafka服务端确认
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);

// KeyedMessage<K, V>
//   K对应Partition Key的类型
//   V对应消息本身的类型//   topic: "test", key: "key", message: "message"
KeyedMessage<String, String> message = new KeyedMessage<String, String>("test", "key", "message");
producer.send(message);

// 关闭producer实例
producer.close();

2). 消费者代码
使用High Level Consumer的API 线程模型和Partition数最好能保持一致, 即One Thread For Partition
参考sample样例: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
代码片段如下:

public static void main(String[] args) {

  // *) 创建ConsumerConfig
  Properties props = new Properties();
  // 设置zookeeper的链接地址
  props.put("zookeeper.connect", "127.0.0.1:2181");
  // 设置group id
  props.put("group.id", "group_id");
  // kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
  props.put("auto.commit.interval.ms", "1000");

  ConsumerConfig consumerConfig = new ConsumerConfig(props);
  ConsumerConnector consumer = (ConsumerConnector) Consumer.createJavaConsumerConnector(consumerConfig);

  String topic = "test";
  int threadNum = 1;

  // *) 设置Topic=>Thread Num映射关系, 构建具体的流
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(topic,threadNum);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

  // *) 启动线程池去消费对应的消息
  ExecutorService executor = Executors.newCachedThreadPool();
  for ( final KafkaStream<byte[], byte[]> stream : streams ) {
    executor.submit(new Runnable() {
      public void run() {
        ConsumerIterator<byte[], byte[]> iter = stream.iterator();
        while ( iter.hasNext() ) {
          MessageAndMetadata<byte[] , byte[]> mam = iter.next();
          System.out.println(
            String.format("thread_id: %d, key: %s, value: %s",
                Thread.currentThread().getId(),
                new String(mam.key()),
                new String(mam.message())
              )
          );  
        }
      }
    });
  }

  try {
    Thread.sleep(1000 * 10);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }

  // *) 优雅地退出
  consumer.shutdown();
  executor.shutdown();

  while ( !executor.isTerminated() ) {
    try {
      executor.awaitTermination(1, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
    }
  }

}

结果输出:

thread_id: 18, key: key, value: message

Kafka实战系列--Kafka API使用体验

时间: 2024-08-06 05:34:20

Kafka实战系列--Kafka API使用体验的相关文章

Kafka实战-Kafka到Storm

1.概述 在<Kafka实战-Flume到Kafka>一文中给大家分享了Kafka的数据源生产,今天为大家介绍如何去实时消费Kafka中的数据.这里使用实时计算的模型——Storm.下面是今天分享的主要内容,如下所示: 数据消费 Storm计算 预览截图 接下来,我们开始分享今天的内容. 2.数据消费 Kafka的数据消费,是由Storm去消费,通过KafkaSpout将数据输送到Storm,然后让Storm安装业务需求对接受的数据做实时处理,下面给大家介绍数据消费的流程图,如下图所示: 从图

Kafka实战-Flume到Kafka (转)

原文链接:Kafka实战-Flume到Kafka 1.概述 前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据.下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览 下面开始今天的分享内容. 2.数据来源 Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理).关于Flume集群的Ag

漫游Kafka实战篇之客户端API

Kafka Producer APIs 旧版的Procuder API有两种:kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer.它们都实现了同一个接口: [java] view plaincopy class Producer { /* 将消息发送到指定分区 */ public void send(kafka.javaapi.producer.ProducerData<K,V> producerData); /* 批量

Kafka实战分析

1. Kafka概要设计 kafka在设计之初就需要考虑以下4个方面的问题: 吞吐量/延时 消息持久化 负载均衡和故障转移 伸缩性 1.1 吞吐量/延时 对于任何一个消息引擎而言,吞吐量都是至关重要的性能指标.那么何为吞吐量呢?通常来说,吞吐量是某种处理能力的最大值.而对于Kafka而言,它的吞吐量就是每秒能够处理的消息数或者每秒能够处理的字节数.很显然,我们自然希望消息引擎的吞吐量越大越好. 消息引擎系统还有一个名为延时的性能指标.它衡量的是一段时间间隔,可能是发出某个操作与接收到操作响应(r

(转)kafka实战

转自https://www.cnblogs.com/hei12138/p/7805475.html 1.       kafka介绍 1.1.       主要功能 根据官网的介绍,ApacheKafka?是一个分布式流媒体平台,它主要有3种功能: 1:It lets you publish and subscribe to streams of records.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因 2:It lets you store strea

大数据日志传输之Kafka实战教程

大数据日志传输之Kafka实战 本套课程围绕Kafka架构详细讲解kafka的核心 架构组件,broker,consumer,producer,以及日志的分段存储,稀疏索引,副本平衡,重分区, 数据同步,Kafka的核心组控制器和消费者控制器等机制. 全面讲解java 最新版的api ,指定分区消费,流控制,手动commit,异步Callback,同步的按照Partition进行批量commit等.实战集成Springboot,spring,以及会讲解到最新的exactly-once, 集成序列

漫游kafka实战篇之搭建Kafka开发环境

转载注明出处:http://blog.csdn.net/honglei915/article/details/37563647 上篇文章中我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息.下面我们来搭建kafka的开发环境. 添加依赖 搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了.不过我们使用另一种更加流行的方式:使用maven管理jar包依赖. 创建好mav

【转】apache kafka监控系列-KafkaOffsetMonitor

apache kafka监控系列-KafkaOffsetMonitor 时间 2014-05-27 18:15:01  CSDN博客 原文  http://blog.csdn.net/lizhitao/article/details/27199863 主题 Apache Kafka apache kafka中国社区QQ群:162272557 概览 最近kafka server消息服务上线了,基于jmx指标参数也写到zabbix中了,但总觉得缺少点什么东西,可视化可操作的界面.zabbix中数据比

apache kafka监控系列-KafkaOffsetMonitor(转)

原文链接:apache kafka监控系列-KafkaOffsetMonitor 概览 最 近kafka server消息服务上线了,基于jmx指标参数也写到zabbix中了,但总觉得缺少点什么东西,可视化可操作的界面.zabbix中数据比较分散,不 能集中看整个集群情况.或者一个cluster中broker列表,自己写web-console比较耗时耗力,用原型工具画了一些管理界面东西,关键自 己也不前端方面技术,这方面比较薄弱.这不开源社区提供了kafka的web管理平台KafkaOffset