kafka解释三的具体:发展Kafka应用

一个、整体外观Kafka

我们知道。Kafka系统有三大组件:Producer、Consumer、broker 。

producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume)。

二、开发一个Producer应用

Producers用来生产消息并把产生的消息推送到Kafka的Broker。Producers能够是各种应用。比方web应用。server端应用,代理应用以及log系统等等。

当然。Producers如今有各种语言的实现比方Java、C、Python等。

我们先看一下Producer在Kafka中的角色:



watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvc3VpZmVuZzMwNTE=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast" >   

2.1.kafka Producer 的 API

Kafka中和producer相关的API有三个类

  • Producer:最基本的类。用来创建和推送消息
  • KeyedMessage:定义要发送的消息对象,比方定义发送给哪个topic,partition key和发送的内容等。
  • ProducerConfig:配置Producer。比方定义要连接的brokers、partition class、serializer class、partition key等

2.2以下我们就写一个最简单的Producer:产生一条消息并推送给broker

package bonree.producer;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/*******************************************************************************
 * BidPlanStructForm.java Created on 2014-7-8
 * Author: <a href=mailto:[email protected]>houda</a>
 * @Title: SimpleProducer.java
 * @Package bonree.producer
 * Description:
 * Version: 1.0
 ******************************************************************************/
public class SimpleProducer {
	private static Producer<Integer,String> producer;
	private final Properties props=new Properties();
	public SimpleProducer(){
		//定义连接的broker list
		props.put("metadata.broker.list", "192.168.4.31:9092");
		//定义序列化类(Java对象传输前要序列化)
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		producer = new Producer<Integer, String>(new ProducerConfig(props));
	}
	public static void main(String[] args) {
		SimpleProducer sp=new SimpleProducer();
		//定义topic
		String topic="mytopic";
		//定义要发送给topic的消息
		String messageStr = "send a message to broker ";
		//构建消息对象
		KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
		//推送消息到broker
		producer.send(data);
		producer.close();
	}
}

三、开发一个consumer应用

Consumer是用来消费Producer产生的消息的,当然一个Consumer能够是各种应用。如能够是一个实时的分析系统。也能够是一个数据仓库或者是一个基于公布订阅模式的解决方式等。Consumer端相同有多种语言的实现,如Java、C、Python等。

我们看一下Consumer在Kafka中的角色:

3.1.kafka Producer 的 API

Kafka和Producer略微有些不同。它提供了两种类型的API

  • high-level consumer API:提供了对底层API的抽象,使用起来比較简单
  • simple consumer API:同意重写底层API的实现,提供了很多其它的控制权,当然使用起来也复杂一些

因为是第一个应用,我们这部分使用high-level API,它的特点每消费一个message自己主动移动offset值到下一个message。关于offset在后面的部分会单独介绍。与Producer类似,和Consumer相关的有三个基本的类:

  • KafkaStream:这里面返回的就是Producer生产的消息
  • ConsumerConfig:定义要连接zookeeper的一些配置信息(Kafka通过zookeeper均衡压力,详细请查阅见面几篇文章)。比方定义zookeeper的URL、group id、连接zookeeper过期时间等。
  • ConsumerConnector:负责和zookeeper进行连接等工作

3.2以下我们就写一个最简单的Consumer:从broker中消费一个消息

package bonree.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/*******************************************************************************

 * Created on 2014-7-8 Author: <a
 * href=mailto:[email protected]>houda</a>
 * @Title: SimpleHLConsumer.java
 * @Package bonree.consumer Description: Version: 1.0
 ******************************************************************************/
public class SimpleHLConsumer {
	private final ConsumerConnector consumer;
	private final String topic;

	public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
		Properties props = new Properties();
		//定义连接zookeeper信息
		props.put("zookeeper.connect", zookeeper);
		//定义Consumer全部的groupID,关于groupID,后面会继续介绍
		props.put("group.id", groupId);
		props.put("zookeeper.session.timeout.ms", "500");
		props.put("zookeeper.sync.time.ms", "250");
		props.put("auto.commit.interval.ms", "1000");
		consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
		this.topic = topic;
	}

	public void testConsumer() {
		Map<String, Integer> topicCount = new HashMap<String, Integer>();
		//定义订阅topic数量
		topicCount.put(topic, new Integer(1));
		//返回的是全部topic的Map
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
		//取出我们要须要的topic中的消息流
		List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
		for (final KafkaStream stream : streams) {
			ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
			while (consumerIte.hasNext())
				System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
		}
		if (consumer != null)
			consumer.shutdown();
	}

	public static void main(String[] args) {
		String topic = "mytopic";
		SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.4.32:2181", "testgroup", topic);
		simpleHLConsumer.testConsumer();
	}

}

四、执行查看结果

先启动server端相关进程:

  • 执行zookeeper:[[email protected] kafka-0.8]# bin/zookeeper-server-start.sh config/zookeeper.properties
  • 执行Kafkabroker:[[email protected] kafka-0.8]# bin/kafka-server-start.sh config/server.properties

再执行我们写的应用

  • 执行刚才写的SimpleHLConsumer 类的main函数,等待生产者生产消息
  • 执行SimpleProducer的main函数。生产消息并push到broker

结果:执行完SimpleProducer后在SimpleHLConsumer的控制台就可以看到生产者生产的消息:“send a message to broker”。

版权声明:本文博客原创文章。博客,未经同意,不得转载。

时间: 2024-08-04 20:39:59

kafka解释三的具体:发展Kafka应用的相关文章

kafka学习(三)-kafka集群搭建

kafka集群搭建 下面简单的介绍一下kafka的集群搭建,单个kafka的安装更简单,下面以集群搭建为例子. 我们设置并部署有三个节点的 kafka 集合体,必须在每个节点上遵循下面的步骤来启动 kafka 服务器,kafka集群需要依赖zookeeper集群,上一篇已经说道了zookeeper的搭建,方法请参考:http://www.cnblogs.com/chushiyaoyue/p/5615267.html 1.环境准备 测试服务器(2n+1)奇数台 192.168.181.128 ce

kafka深入研究之路(2) kafka简介与专业术语解释说明

目录:1.kafka简介 什么是kafka? 设计目标是什么?2.kafka的优缺点3.kafka中专业术语解释说明 官方网站: http://kafka.apache.org/introkafka中文教程 http://orchome.com/kafka/index 1/ kafka 简介Kafka是最初由Linkedin公司开发,是一个分布式.分区的.多副本的.多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志.访问日志,消息服务等

Apache Kafka系列(三) Java API使用

Apache Kafka系列(一) 起步 Apache Kafka系列(二) 命令行工具(CLI) Apache Kafka系列(三) Java API使用 摘要: Apache Kafka Java Client API 一.基本概念 Kafka集成了Producer/Consumer连接Broker的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如: 1.创建Topic 2.罗列出已存在的Topic 3.对已有Topic的Produce/Consume测试

kafka入门三:写第一个Kafka应用

一.整体看一下Kafka 我们知道,Kafka系统有三大组件:Producer.Consumer.broker . producers 生产(produce)消息(message)并推(push)送给brokers,consumers从brokers把消息提取(pull)出来消费(consume). 二.开发一个Producer应用 Producers用来生产消息并把产生的消息推送到Kafka的Broker.Producers可以是各种应用,比如web应用,服务器端应用,代理应用以及log系统等

kafka的三种部署模式

/************* *kafka 0.8.1.1的安装部署 *blog:www.r66r.net *qq:26571864 **************/ 相关部署视频地址:http://edu.51cto.com/course/course_id-2374.html kafka的部署模式为3种模式 1)单broker模式 2)单机多broker模式 (伪集群) 3)多机多broker模式 (真正的集群模式) 第一种模式安装 1.在hadoopdn2机器上面上传kafka文件,并解压到

Kafka的三种客户端线程模型和一个小惊喜

Kafka 作为一个流式数据平台,对开发者提供了三种客户端:生产者 / 消费者.连接器.流处理.本文着重分析这三种客户端的线程模型.看到最后的通常都有惊喜.消费者的线程模型0.8 版本以前的消费者客户端会创建一个基于 ZK 的消费者连接器,一个消费者客户端是一个 Java 进程,消费者可以订阅多个主题,每个主题也可以多个线程.为了让消息在多个节点被分布式地消费,提高消息处理的吞吐量,Kafka 允许多个消费者订阅同一个主题,这些消费者需要满足"一个分区只能被一个消费者中的一个线程处理"

Kafka设计解析(五)- Kafka性能测试方法及Benchmark报告

本文转发自Jason’s Blog,原文链接 http://www.jasongj.com/2015/12/31/KafkaColumn5_kafka_benchmark 摘要 本文主要介绍了如何利用Kafka自带的性能测试脚本及Kafka Manager测试Kafka的性能,以及如何使用Kafka Manager监控Kafka的工作状态,最后给出了Kafka的性能测试报告. 性能测试及集群监控工具 Kafka提供了非常多有用的工具,如Kafka设计解析(三)- Kafka High Avail

Kafka设计解析(一)- Kafka背景及架构介绍

本文转发自Jason’s Blog,原文链接 http://www.jasongj.com/2015/03/10/KafkaColumn1 摘要 Kafka是由LinkedIn开发并开源的分布式消息系统,因其分布式及高吞吐率 而被广泛使用,现已与Cloudera Hadoop,Apache Storm,Apache Spark集成.本文介绍了Kafka的创建背景,设计目标,使用消息系统的优势以及目前流行的消息系统对比.并介绍了Kafka的架 构,Producer消息路由,Consumer Gro

【转】Kafka某topic无法消费解决方案&amp;Kafka某Topic数据清理

由于项目原因,最近经常碰到Kafka消息队列某topic在集群宕机重启后无法消费的情况.碰到这种情况,有三步去判断原因所在: step A:如果用kafka串口(即console-consumer)是可以正常消费该topic,则排除kafka集群出现故障 step B:若平台业务能正常消费其他topic的消息,则排除平台业务代码逻辑问题 step C:不到万不得已,则只能手动删除kafka的对应topic的Log,但是清理Kafka Log又不能单纯的去删除中间环节产生的日志,中间关联的很多东西