环境
Centos7
RocketMQ 3.2.6
安装位置 /usr/local/alibaba-rockermq
外网ip 182.254.145.66
内网ip 10.105.23.114
安装
wget https://github.com/alibaba/RocketMQ/releases/download/v3.2.6/alibaba-rocketmq-3.2.6.tar.gz
tar alibaba-rocketmq-3.2.6.tar.gz
cd alibaba-rocketmq
启动
nohup sh mqnamesrv -n 10.105.23.114:9876 &
nohup sh mqbroker -n 10.105.23.114:9876
java测试
使用maven构建环境
[html] view plain copy
- <!-- http://mvnrepository.com/artifact/com.alibaba.rocketmq/rocketmq-client -->
- <dependency>
- <groupId>com.alibaba.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>3.2.3</version>
- </dependency>
[java] view plain copy
- package rocketmq;
- import java.util.Date;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
- import com.alibaba.rocketmq.client.producer.SendResult;
- import com.alibaba.rocketmq.common.message.Message;
- public class Producer {
- public static void main(String[] args) throws MQClientException, InterruptedException {
- DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
- producer.setNamesrvAddr("182.254.145.66:9876");
- producer.setInstanceName("rmq-instance");
- producer.start();
- try {
- for (int i = 0; i < 3; i++) {
- Message msg = new Message("TopicA-test",// topic
- "TagA",// tag
- (new Date() + "Hello RocketMQ ,QuickStart" + i)
- .getBytes()// body
- );
- SendResult sendResult = producer.send(msg);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- producer.shutdown();
- }
- }
- package rocketmq;
- import java.util.List;
- import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
- import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import com.alibaba.rocketmq.client.exception.MQClientException;
- import com.alibaba.rocketmq.common.message.MessageExt;
- public class Consumer {
- public static void main(String[] args) throws InterruptedException, MQClientException {
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
- consumer.setNamesrvAddr("182.254.145.66:9876");
- consumer.setInstanceName("rmq-instance");
- consumer.subscribe("TopicA-test", "TagA");
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(
- List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
- for (MessageExt msg : msgs) {
- System.out.println(new String(msg.getBody()));
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.out.println("Consumer Started.");
- }
- }
运行consumer后发现
com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <10.105.23.114:10911> failed
在nohup.out里发现
The broker[localhost, 10.105.23.114:10911] boot success. and name server is 182.254.145.65:9876
哎,看来还是外网内网ip的问题
上次在安装Tair的时候就碰到过类似的问题 详见 Centos7安装Tair及配置测试
最后经过多方搜索,在官方的用户说明里看到下面的方法
经过我修改后的broker.p
[html] view plain copy
- namesrvAddr=127.0.0.1:9876
- brokerIP1=182.254.145.66
- brokerName=localhost
- brokerClusterName=DefaultCluster
- brokerId=0
- autoCreateTopicEnable=true
- autoCreateSubscriptionGroup=true
- rejectTransactionMessage=false
- fetchNamesrvAddrByAddressServer=false
- storePathRootDir=/root/store
- storePathCommitLog=/root/store/commitlog
- flushIntervalCommitLog=1000
- flushCommitLogTimed=false
- deleteWhen=04
- fileReservedTime=72
- maxTransferBytesOnMessageInMemory=262144
- maxTransferCountOnMessageInMemory=32
- maxTransferBytesOnMessageInDisk=65536
- maxTransferCountOnMessageInDisk=8
- accessMessageInMemoryMaxRatio=40
- messageIndexEnable=true
- messageIndexSafe=false
- haMasterAddress=
- brokerRole=ASYNC_MASTER
- flushDiskType=ASYNC_FLUSH
- cleanFileForciblyEnable=true
ok!
这说明什么?说明第一手资料很重要
参考资料
http://www.jialeens.com/archives/681.html
http://www.cnblogs.com/xiaodf/p/5075167.html
http://blog.csdn.net/dlf123321/article/details/51514263