首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1)
kafka常用的发送消息的方法如下:
Properties props = new Properties(); props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); String str = "test"; producer.send(new KeyedMessage<String, String>("exhibition",str));
但是如果用kafka发送对象的话就需要重写serializer.class中byte[] toBytes方法:
Producer示例:其中MessageBean是自己定义的实体类:
Properties props = new Properties(); props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181"); props.put("serializer.class", "com.performanceTest.BeanSerializer"); // 需要修改 props.put("metadata.broker.list","slaves2:9092,slaves3:9092,slaves4:9092"); ProducerConfig config = new ProducerConfig(props); Producer<MessageBean, MessageBean> producer = new Producer<MessageBean, MessageBean>(config); MessageBean str = new MessageBean(); str.setFromJID("2"+i); str.setToJID("3"+i); str.setMessage("京"+i); str.setSendtime(System.currentTimeMillis()); KeyedMessage<MessageBean, MessageBean> data = new KeyedMessage<MessageBean, MessageBean>("exhibition",str); producer.send(data);
com.performanceTest.BeanSerializer代码:
package com.performanceTest; import com.performanceTest.BeanUtils; import kafka.serializer.Encoder; import kafka.utils.VerifiableProperties; import kafka.serializer.Encoder; import kafka.utils.VerifiableProperties; import com.performanceTest.MessageBean; public class BeanSerializer implements Encoder<MessageBean>{ public BeanSerializer(VerifiableProperties props) { } @Override public byte[] toBytes(MessageBean mb) { System.out.println("encoder ---> " + mb); return BeanUtils.object2Bytes(mb); } }
BeanUtils的代码:
public class BeanUtils { public static Object bytes2Object(byte[] bytes) { Object obj = null; ByteArrayInputStream bais = null; ObjectInputStream ois = null; try { bais = new ByteArrayInputStream(bytes); ois = new ObjectInputStream(bais); obj = (Object) ois.readObject(); } catch (Exception e) { e.printStackTrace(); } finally { try { ois.close(); } catch (IOException e) { e.printStackTrace(); } } return obj; } public static byte[] object2Bytes(Object obj) { byte[] bytes = null; ByteArrayOutputStream baos = null; ObjectOutputStream oos = null; try { baos = new ByteArrayOutputStream(); oos = new ObjectOutputStream(baos); oos.writeObject(obj); bytes = baos.toByteArray(); } catch (Exception e) { e.printStackTrace(); } finally { try { oos.close(); } catch (IOException e) { e.printStackTrace(); } } return bytes; } }
Consumer示例:
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer .createMessageStreams(topicCountMap); KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { byte[] bytes = it.next().message(); MessageBean mb = (MessageBean) BeanUtils.bytes2Object(bytes); ... ... ... }
OK,至此基本的应用kafka传输接受对象的例子完毕,尝试看过高端的代码如SimpleConsumer,基础不够,实在费劲,接着努力吧~~~~
PS:转载请注明出处
时间: 2024-10-25 16:05:45