【原创】Kafka接受发送消息对象Object基础版

首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1)

kafka常用的发送消息的方法如下:

Properties props = new Properties();
props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
String str = "test";
producer.send(new KeyedMessage<String, String>("exhibition",str));

但是如果用kafka发送对象的话就需要重写serializer.class中byte[] toBytes方法:

Producer示例:其中MessageBean是自己定义的实体类:

Properties props = new Properties();
props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181");
props.put("serializer.class", "com.performanceTest.BeanSerializer"); // 需要修改
props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092");
ProducerConfig config = new ProducerConfig(props);
Producer<MessageBean, MessageBean> producer = new Producer<MessageBean, MessageBean>(config);
MessageBean str =  new MessageBean();
	str.setFromJID("2"+i);
	str.setToJID("3"+i);
	str.setMessage("京"+i);
	str.setSendtime(System.currentTimeMillis());
KeyedMessage<MessageBean, MessageBean> data = new KeyedMessage<MessageBean, MessageBean>("exhibition",str);
	producer.send(data);

com.performanceTest.BeanSerializer代码:

package com.performanceTest;
import com.performanceTest.BeanUtils;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import kafka.serializer.Encoder;
import kafka.utils.VerifiableProperties;
import com.performanceTest.MessageBean;
public class BeanSerializer implements Encoder<MessageBean>{

	 public BeanSerializer(VerifiableProperties props) {

	 }

	@Override
	public byte[] toBytes(MessageBean mb) {
		System.out.println("encoder ---> " + mb);
		return BeanUtils.object2Bytes(mb);
	}

}

BeanUtils的代码:

public class BeanUtils {
	public static Object bytes2Object(byte[] bytes) {
		Object obj = null;
		ByteArrayInputStream bais = null;
		ObjectInputStream ois = null;
		try {
			bais = new ByteArrayInputStream(bytes);
			ois = new ObjectInputStream(bais);
			obj = (Object) ois.readObject();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				ois.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}

		return obj;
	}

	public static byte[] object2Bytes(Object obj) {
		byte[] bytes = null;
		ByteArrayOutputStream baos = null;
		ObjectOutputStream oos = null;
		try {
			baos = new ByteArrayOutputStream();
			oos = new ObjectOutputStream(baos);
			oos.writeObject(obj);
			bytes = baos.toByteArray();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				oos.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}

		return bytes;
	}
}

Consumer示例:

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
				.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
		byte[] bytes = it.next().message();
		MessageBean mb = (MessageBean) BeanUtils.bytes2Object(bytes);
                ...
                ...
                ...
}

OK,至此基本的应用kafka传输接受对象的例子完毕,尝试看过高端的代码如SimpleConsumer,基础不够,实在费劲,接着努力吧~~~~

PS:转载请注明出处

时间: 2024-10-25 16:05:45

【原创】Kafka接受发送消息对象Object基础版的相关文章

kafka无法发送消息问题处理

背景 在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常.当在本地使用JAVA程序发送消息时,一直出错. 抛出的错误为: Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12] kafka.common.FailedToSendMessageException: Failed to send messag

XMPP接受发送消息

在现阶段的通信服务中,各种标准都有,因此会出现无法实现相互连通,而XMPP(Extensible Message and presence Protocol)协议的出现.实现了整个及时通信服务协议的互通.有了这个协议之后.使用不论什么一个组织或者个人提供的即使通信服务,都可以无障碍的与其它的及时通信服务的用户进行交流.比如google 公司2005年推出的Google talk就是一款基于XMPP协议的即时通信软件. 以下我们就谈论一下怎样简单的使用XMPP的接收和发送消息 1.在XMPPFra

log4j2发送消息至Kafka

title: 自定义log4j2发送日志到Kafka 图片描述(最多50字) tags: log4j2,kafka 为了给公司的大数据平台提供各项目组的日志,而又使各项目组在改动上无感知.做了一番调研后才发现log4j2默认有支持将日志发送到kafka的功能,惊喜之下赶紧看了下log4j对其的实现源码!发现默认的实现是同步阻塞的,如果kafka服务一旦挂掉会阻塞正常服务的日志打印,为此本人在参考源码的基础上做了一些修改. log4j日志工作流程 log4j2对于log4j在性能上有着显著的提升,

ARC-不要向已经释放的对象发送消息

一,在AppDelegate.m中写入如下代码: - (BOOL)application:(UIApplication *)application didFinishLaunchingWithOptions:(NSDictionary *)launchOptions { // Override point for customization after application launch. NSObject *object=[[NSObject alloc]init]; NSLog(@"%ld

Golang之发送消息至kafka

windows下安装zookeeper 1.安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的) 2.安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/ 3.重命名conf/zoo_sample.cfg 为conf/zoo.cfg 4.编辑 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\ 4.运行bin/zkServer.cmd 启动结果如下: 安装kafka 1

WebSocket.之.基础入门-前端发送消息

WebSocket.之.基础入门-前端发送消息 在<WebSocket.之.基础入门-建立连接>的代码基础之上,进行添加代码.代码只改动了:TestSocket.java 和 index.jsp 两个文件. 项目结构如下图: TestSocket.java 1 package com.charles.socket; 2 3 import javax.websocket.OnMessage; 4 import javax.websocket.OnOpen; 5 import javax.webs

java客户端向单机版kafka发送消息没有接收到

kafka版本:kafka_2.11-0.10.0.0 在kafka服务器命令发送消息,消费者可以接受到, 但是在java客户端向kafka发送消息时消费者接受不到, 在kafka/config/sever.properties把这个注解打开 advertised.listeners=PLAINTEXT://ip.137:9092        #本机服务器ip 意思就是说:hostname.port都会广播给producer.consumer.如果你没有配置了这个属性的话,则使用listene

【原创】Kafka 0.11消息设计

Kafka 0.11版本增加了很多新功能,包括支持事务.精确一次处理语义和幂等producer等,而实现这些新功能的前提就是要提供支持这些功能的新版本消息格式,同时也要维护与老版本的兼容性.本文将详细探讨Kafka 0.11新版本消息格式的设计,其中会着重比较新旧两版本消息格式在设计上的异同.毕竟只有深入理解了Kafka的消息设计,我们才能更好地学习Kafka所提供的各种功能. 1. Kafka消息层次设计 不管是0.11版本还是之前的版本,Kafka的消息层次都是分为两层:消息集合(messa

用Spring发送和接受JMS消息的一个小例子

Spring提供的JmsTemplate对原生的JMS API进行了一层薄薄的封装,使用起来非常的方便. 我使用的JMS消息代理插件是Apache的ActiveMQ,建议安装最新版本,因为我之前安装老版本,各种不兼容,各种bug,最新版的activemq-all-5.9.1.jar包里面已经有了slf4j.impl包,之前就是被这个坑了...把这个jar包加到lib目录下面,就不会有各种ClassNotFound异常和类冲突的bug了. 下载ActiveMQ解压之后运行bin下面的activem