使用Java编写ActiveMQ的队列模式和主题模式

队列模式的消息演示

本小节简单演示一下如何使用JMS接口规范连接ActiveMQ,首先创建一个Maven工程,在pom.xml文件中,添加activemq的依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
</dependencies>

创建一个 AppProducer 类,用于演示下如何使用JMS接口规范使用ActiveMQ的队列模式。代码如下:

package org.zero01.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @program: jms-test
 * @description: 消息生产者
 * @author: 01
 * @create: 2018-05-26 16:44
 **/
public class AppProducer {

    // activemq服务器的url地址,默认通信端口为61616
    private static final String URL = "tcp://192.168.190.129:61616";
    // 队列的名称
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂对象(ConnectionFactory)
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.创建连接对象(Connection)
        Connection connection = connectionFactory.createConnection();

        // 3.启动连接
        connection.start();

        // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建目的地(destination)
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.创建生产者
        MessageProducer producer = session.createProducer(destination);

        // 循环发送消息
        for (int i = 0; i < 100; i++) {
            // 7.创建消息,这里创建的是简单的文本消息体
            TextMessage textMessage = session.createTextMessage("test" + i);
            // 8.使用消息生产者往目的地发送消息
            producer.send(destination, textMessage);

            System.out.println("消息发送成功:" + textMessage.getText());
        }

        // 9.关闭连接
        connection.close();
    }
}

编写完代码后,登录ActiveMQ的管理页面,点击选项卡上的 “Queues” 进入到如下界面,可以看到现在这里什么数据都没有:

我们运行上面编写的代码之后,刷新该页面,可以看到现在就有数据了:

接着我们来编写一个消费者,去消费队列中的消息。代码如下:

package org.zero01.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @program: jms-test
 * @description: 消息消费者
 * @author: 01
 * @create: 2018-05-26 17:08
 **/
public class AppConsumer {

    // activemq服务器的url地址,默认通信端口为61616
    private static final String URL = "tcp://192.168.190.129:61616";
    // 队列的名称
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂对象(ConnectionFactory)
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.创建连接对象(Connection)
        Connection connection = connectionFactory.createConnection();

        // 3.启动连接
        connection.start();

        // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建目的地(destination)
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.创建消费者
        MessageConsumer messageConsumer = session.createConsumer(destination);

        // 7.创建一个监听器
        messageConsumer.setMessageListener((message) -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收消息: " + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}

编写完代码后,点击选项卡上的 “Connections” 进入到如下界面,可以看到现在这里一个连接都没有:

我们运行上面编写的代码之后,刷新该页面,可以看到现在就有一个消费者连接了:

消费者运行之后,是一个线程阻塞状态的,也就是会与ActiveMQ服务器保持连接。现在我们再来启动一个消费者,如下就有两个消费者了:

启动了两个消费者后,运行生产者的代码。我们来看队列模式的一个现象,如下:

控制台打印信息如上,有没有发现,消费者1所消费的消息是偶数的,而消费者2消费的消息则是奇数的。这就是队列模式的一个现象,消费者们会均匀地、尽可能平均地消费队列中的消息。


主题模式的消息演示

主题模式的代码和队列模式的代码十分类似,只有创建目的地的方法不一样。消息发布者代码如下:

package org.zero01.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @program: jms-test
 * @description: 消息发布者
 * @author: 01
 * @create: 2018-05-26 16:44
 **/
public class AppPublisher {

    // activemq服务器的url地址,默认通信端口为61616
    private static final String URL = "tcp://192.168.190.129:61616";
    // 队列的名称
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂对象(ConnectionFactory)
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.创建连接对象(Connection)
        Connection connection = connectionFactory.createConnection();

        // 3.启动连接
        connection.start();

        // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建目的地(destination)
        Destination destination = session.createTopic(TOPIC_NAME);

        // 6.创建生产者
        MessageProducer producer = session.createProducer(destination);

        // 循环发送消息
        for (int i = 0; i < 100; i++) {
            // 7.创建消息,这里创建的是简单的文本消息体
            TextMessage textMessage = session.createTextMessage("test" + i);
            // 8.使用消息生产者往目的地发送消息
            producer.send(destination, textMessage);

            System.out.println("消息发送成功:" + textMessage.getText());
        }

        // 9.关闭连接
        connection.close();
    }
}

消息订阅者代码如下:

package org.zero01.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @program: jms-test
 * @description: 消息订阅者
 * @author: 01
 * @create: 2018-05-26 17:08
 **/
public class AppSubscriber {

    // activemq服务器的url地址,默认通信端口为61616
    private static final String URL = "tcp://192.168.190.129:61616";
    // 队列的名称
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws JMSException {
        // 1.创建连接工厂对象(ConnectionFactory)
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.创建连接对象(Connection)
        Connection connection = connectionFactory.createConnection();

        // 3.启动连接
        connection.start();

        // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.创建目的地(destination)
        Destination destination = session.createTopic(TOPIC_NAME);

        // 6.创建消费者
        MessageConsumer messageConsumer = session.createConsumer(destination);

        // 7.创建一个监听器
        messageConsumer.setMessageListener((message) -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收消息: " + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}

但主题模式与队列模式的消费方式不太一样,队列模式是先启动消息生产者去发送消息到队列里,然后消费者再去消费。而主题模式则是先启动消息订阅者去进行订阅,然后再启动消息发布者去发布消息,这样消息订阅者才能收到消息发布者所发布的消息。所以我们先启动消息订阅者,再启动消息发布者。完成启动后,这时到ActiveMQ的 “Topics” 页面上查看信息如下:

除了以上这个区别外,我们来启动两个订阅者,然后再启动发布者,看看订阅者们接收到的消息是怎么样的:

控制台打印信息如上,可以看到两个订阅者都各自接收到了同一份消息。也就是说,如果有两个订阅者,那么消息就会有两份,有多个订阅者则有多份。

原文地址:http://blog.51cto.com/zero01/2120667

时间: 2024-10-09 17:14:26

使用Java编写ActiveMQ的队列模式和主题模式的相关文章

activeMQ队列模式和主题模式的Java实现

一.队列模式 生产者 import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activ

java ReentrantLock结合条件队列 实现生产者-消费者模式

1 package reentrantlock; 2 3 import java.util.ArrayList; 4 5 public class ProviderAndConsumerTest { 6 7 static ProviderAndConsumer providerAndConsumer = new ProviderAndConsumer(); 8 9 public static void main(String[] args) throws InterruptedException

JAVA的设计模式之观察者模式----结合ActiveMQ消息队列说明

1----------------------观察者模式------------------------------ 观察者模式:定义对象间一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知自动更新. activeMQ消息队列 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮

Java并发(基础知识)—— 阻塞队列和生产者消费者模式

1.阻塞队列 BlockingQueue是线程安全的Queue版本,从它的名字就可以看出,它是一个支持阻塞的Queue实现:当向空BlockingQueue请求数据时,它会阻塞至BlockingQueue非空:当向一个已满BlockingQueue插入数据时,线程会阻塞至BlockingQueue可插入. BlockingQueue 的方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 fa

ActiveMQ队列、主题模式区别

1.ActiveMQ队列模式如下图,生产者创建消息到消息中间件,再"均分给消费者". 2.ActiveMQ主题模式如下图,生产者创建消息到消息中间件,消费者会接受到订阅的主题中所有的消息.在主题模式下,消费者获取不到订阅之前的中间件中的消息. 原文地址:https://www.cnblogs.com/GrapefruitTea/p/9941169.html

Java中间件-ActiveMQ

为什么需要使用消息中间件? 系统解耦 异步 横向扩展 安全可靠 顺序保证 什么是中间件? 非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件. 什么是消息中间件? 关注于数据的发送和接受,利用高效可靠的异步消息传递机制集成分布式系统. 消息中间件图示? 什么是JMS? java消息服务(java Message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

java并发之阻塞队列

在前面我们接触的队列都是非阻塞队列,比如PriorityQueue.LinkedList(LinkedList是双向链表,它实现了Dequeue接口). 阻塞队列与普通队列的区别在于:当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或

深入剖析java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue

关联文章: 深入理解Java类型信息(Class对象)与反射机制 深入理解Java枚举类型(enum) 深入理解Java注解类型(@Annotation) 深入理解Java类加载器(ClassLoader) 深入理解Java并发之synchronized实现原理 Java并发编程-无锁CAS与Unsafe类及其并发包Atomic 深入理解Java内存模型(JMM)及volatile关键字 剖析基于并发AQS的重入锁(ReetrantLock)及其Condition实现原理 剖析基于并发AQS的共

Java中的阻塞队列

1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put