ActiveMQ学习之Broker

一、概念

相当于一个ActiveMQ服务实例

Broker其实就是实现了用代码的形式启动了ActiveMQ将MQ嵌入到java代码中,以便随时用随时启动,在用的时候再去启动这样能节约资源,也保证了可靠性。

二、按照不同配置文件启动ActiveMQ

1、 先将ActiveMQ根目录下conf文件夹中的activemq.xml复制一份并重命名为activemq02.xml

命令(cp activemq.xml activemq02.xml)

2、启动activemq02.xml,默认启动的是activemq.xml

命令(./activemq start xbean:file:/usr/local/activeMQ/apache-activemq-5.15.11/conf/activemq02.xml)

三、嵌入式Broker

用ActiveMQ Broker作为独立的消息服务器来构建java应用。ActiveMQ也支持在虚拟机中通信,基于嵌入式的broker,能够无缝的集成其他java应用

四、代码

1、pom.xml中引入包

  <!--activemq-->
    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.9</version>
    </dependency>
    <!--fastjson-->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.9</version>
    </dependency>

2、broker代码


import org.apache.activemq.broker.BrokerService;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/10 16:04
 * @Version: 1.0
 */
public class EmbedBroker {
    public static void main(String[] args) throws Exception {
        //ActiveMQ也支持在虚拟机中通信,嵌入broker
        BrokerService brokerService=new BrokerService();
        //将activeMQ嵌入到java程序中
        brokerService.setUseJmx(true);
        //现在是将activeMQ嵌入到java程序中,所以使用本机
        brokerService.addConnector("tcp://127.0.0.1:61616");
        //启动程序
        brokerService.start();
    }

}

3、队列生产者代码


import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/2 17:04
 * @Version: 1.0
 */
public class ActiveMQTest {
    //url路径
    private static final String ACTRIVE_URL="tcp://127.0.0.1:61616";
    //队列名称
    private static final String QUEUE_NAME="queue01";

    public static void main(String[] args) {
        //1、创建连接工厂
        //如果账号密码没有修改的话,账号密码默认均为admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);

        try {
            //2、通过连接工厂获取连接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3、创建session会话
            //里面会有两个参数,第一个为事物,第二个是签收
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //4、创建目的地(具体是队列还是主题),这里是创建队列
            Queue queue=session.createQueue(QUEUE_NAME);
            //5、创建消息生产者,队列模式
            MessageProducer messageProducer = session.createProducer(queue);
            //6、通过messageProducer生产三条消息发送到MQ消息队列中
            for (int i=0;i<3;i++){
                //7、创建消息
                TextMessage textMessage = session.createTextMessage("msg----->" + i);//创建一个文本消息

                //8、通过messageProducer发送给mq
                messageProducer.send(textMessage);
                //9、数据非持久化
                messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
            }
            messageProducer.close();
            session.commit();
            session.close();
            connection.close();
            System.out.println("消息发送成功");
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

3、队列消费者代码

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @ProjectName: springbootActiveMQ
 * @Package: cn.**.test
 * @Author: huat
 * @Date: 2020/1/3 8:47
 * @Version: 1.0
 */
public class ActiveMQConsumer {
    //url路径
    private static final String ACTRIVE_URL="tcp://127.0.0.1:61616";
    //队列名称
    private static final String QUEUE_NAME="queue01";

    public static void main(String[] args) {
        //1、创建连接工厂
        //如果账号密码没有修改的话,账号密码默认均为admin
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL);
        //如果账号密码修改的话
        //第一个参数为账号,第二个为密码,第三个为请求的url
        //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL);
        try {
            //2、通过连接工厂获取连接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //3、创建session会话
            //里面会有两个参数,第一个为事物,第二个是签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //4、这里接受的queue的名称要和发送者的一致
            Queue queue = session.createQueue(QUEUE_NAME);
            //5、创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
            //6、通过监听的方式消费消息
            while(true){
                //MessageConsumer 调用的receive方法为同步调用,在消息到达之前一直阻塞线程
                //用什么格式发送,这里就用什么格式接受
                //receive等待消息,不限制时间
                TextMessage message=(TextMessage)consumer.receive();

                //receive带参数等待消息,限制时间,单位毫秒
                //TextMessage message=(TextMessage)consumer.receive(4000L);

                if(null != message){
                    System.out.println("接受的消息为------>"+message.getText());
                }else{
                    break;
                }
            }
            //7、闭资源
            consumer.close();
            session.close();
            connection.close();

        }catch (Exception e){
            e.printStackTrace();
        }
    }

}

来源:站长平台

原文地址:https://www.cnblogs.com/1994jinnan/p/12178085.html

时间: 2024-08-07 14:41:43

ActiveMQ学习之Broker的相关文章

ActiveMQ学习笔记(五)——使用Spring JMS收发消息

ActiveMQ学习笔记(四)http://my.oschina.net/xiaoxishan/blog/380446 中记录了如何使用原生的方式从ActiveMQ中收发消息.可以看出,每次收发消息都要写许多重复的代码,Spring 为我们提供了更为方便的方式,这就是Spring JMS.我们通过一个例子展开讲述.包括队列.主题消息的收发相关的Spring配置.代码.测试. 本例中,消息的收发都写在了一个工程里. 1.使用maven管理依赖包 <dependencies> <depend

ActiveMQ学习笔记(六)——JMS消息类型

1.前言 ActiveMQ学习笔记(四)--通过ActiveMQ收发消息http://my.oschina.net/xiaoxishan/blog/380446 和ActiveMQ学习笔记(五)--使用Spring JMS收发消息http://my.oschina.net/xiaoxishan/blog/381209   中,发送和接受的消息类型都是TextMessage,即文本消息(如下面的代码所示).显然消息类型只有文本类型是不能满足要求的. //发送文本消息  session.create

ActiveMQ学习教程(二)——简单示例

ActiveMQ学习教程(二)--简单示例 一.应用IDEA构建Maven项目 File->New->Module...->Maven->勾选->选择->Next -> GroupId:com.jd.myMaven   |    ArtifactId:activeMQ    |    version:默认   ->Finish 项目构建成功!项目结构如下所示: 二.创建生产者类,模拟生产者发消息 Step1:java/activemq/JMSProducer

activemq 学习系列(六) activemq 集群

activemq 集群 activemq 是可以通过 networkConnectors 来实现集群的. 首页准备多台 activemq 1:端口号:8161,服务端口号:61616 2:端口号:8162,服务端口号:61617 3:端口号:8163,服务端口号:61618 然后在任意一台的 /conf/activemq.xml 的 broker 节点中加入一个子节点 <networkConnectors> <networkConnector uri="static:(tcp:

ActiveMQ学习笔记(二)--安装ActiveMQ

一.Windows安装AMQ AMQ下载地址:https://archive.apache.org/dist/activemq/5.13.0/apache-activemq-5.13.0-bin.zip JDK1.7 步骤一:把apache-activemq-5.13.0-bin.zip解压到一个目录下,例如D:\MQ\apache-activemq\apache-activemq-5.13.0. 目录结构介绍如下. bin,放置启动.停止.注册服务等命令文件. conf,配置文件,包括brok

解决ActiveMQ的“Invalid broker URI”异常的历程

000 最近碰到一个问题,把解决的过程记录下来. 故障原因 同事的应用上线,Tomcat无法正常启动.抛出这样的异常: org.springframework.beans.PropertyBatchUpdateException; nested PropertyAccessExceptions (1) are:|PropertyAccessException 1: org.springframework.beans.MethodInvocationException: Property 'bro

activeMQ学习(2)---------点对点、发布订阅的消息代码实现

以下是个人在学习activemq时从网上找到的资料, 总结留给自己以后复习 点对点的实现 2 @Test public void sendMessage(){ 3 try { 4 // 创建一个连接工厂 5 String url = "tcp://localhost:61616"; 6 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 7 // 设置用户名和密码,这个用户名

【ActiveMQ入门-5】ActiveMQ学习-消息持久性

ActiveMQ中的消息持久性     ActiveMQ很好的支持了消息的持久性(Persistence).消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据

【Active入门-3】ActiveMQ学习-发布者与订阅者

2015年4月28日 1个发布者,1个订阅者,topic 方式1: 先发布消息: 然后订阅消息: 方式2: 先订阅消息: 然后发布消息:订阅者如下: 结论1: 从上面可以看出,消息发布需要在线发布. 1个发布者,2个订阅者,topic 方式1: 先发布消息: 然后开启两个订阅者 方式2: 先开启两个订阅者: 然后发布消息: 看到订阅者如下: 方式3: 先发布消息,发布完消息后,让发布者休眠10s,然后退出: 此时开启订阅者: 直到发布者程序退出: 订阅者也没有订阅到任何消息 方式4: 上图中,消