DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Constant.operationLogGroup);
try {
consumer.setNamesrvAddr(Constant.rocketQueneAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe(Constant.operationLogTopic, Constant.operationLogTag);
} catch (MQClientException e) {
logger.error("consume operation log MQ error", e);
}
cometutil = Comet4jUtil.getInstance(CHANNEL);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
byte[] bytes = msgs.get(0).getBody();
try {
cometutil.sendMesToAllConnsWithString(CHANNEL, new String(bytes, "UTF-8"));
} catch (UnsupportedEncodingException e) {
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.start();
logger.info("operationLogController‘s MQ consumer started.");
} catch (MQClientException e) {
logger.error("consume operation log MQ start error", e);
}
版权声明:本文为博主原创文章,未经博主允许不得转载。
时间: 2024-10-11 18:26:41