生产者代码:
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; /** * Created by Loger * Date: 2017-08-29 * TIme: 22:43 * Description : */ public class ProducerObj3 { public static void main(String [] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("chenle"); producer.setNamesrvAddr("192.168.12.128:9876"); producer.setVipChannelEnabled(false); producer.start(); try { for(int i=0;i<10;i++){ User user = new User(i,"陈乐"+i+"号"); Message message = new Message("obj","obj",SerializeUtil.serialize(user)); SendResult send = producer.send(message); System.out.println(send.getMsgId()+":"+send.getSendStatus()); } } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }finally { if (producer!=null){ producer.shutdown(); } } } }
消费者代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * Created by Loger * Date: 2017-08-29 * TIme: 22:51 * Description : */ public class ConsumerObj2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("chenle"); /** * Push Consumer设置 * messageModel CLUSTERING 消息模型,支持以下两种1.集群消费2.广播消费 * consumeFromWhere CONSUME_FROM_LAST_OFFSET Consumer启动后,默认从什么位置开始消费 * allocateMessageQueueStrategy * allocateMessageQueueAveragely Rebalance 算法实现策略 * Subsription{} 订阅关系 * messageListener 消息监听器 * offsetStore 消费进度存储 * consumeThreadMin 10 消费线程池数量 * consumeThreadMax 20 消费线程池数量 * pullThresholdForQueue 1000 拉去消息本地队列缓存消息最大数 * pullInterval 拉消息间隔,由于是轮训,所以为0,但是如果用了流控,也可以设置大于0的值,单位毫秒 * consumeMessageBatchMaxSize 1 批量消费,一次消费杜少条消息 * pullBatchSize 32 批量拉消息,一次最多拉多少条 * */ /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setNamesrvAddr("192.168.12.128:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("obj", "obj"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); MessageExt msg = msgs.get(0); try { byte[] body = msg.getBody(); User user = (User) SerializeUtil.deserialize(body); System.out.println(user.getId()+"---"+user.getName()); }catch (Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //requeue 一会再消费 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // response broker ack } }); consumer.start(); System.out.println("Consumer Started."); } }
运行结果:
时间: 2024-12-25 05:18:58