学习ActiveMQ(二):点对点(队列)模式消息演示

  一:介绍

  点对点的消息发送方式主要建立在 消息(Message ),队列(Queue),发送者(Sender),消费者(receiver)上,Queue 存贮消息,Sender 发送消息,receive接收消息.具体点就是Sender Client通过Queue发送message ,而 receiver Client从Queue中接收消息。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。
  二:通过jms编码接口之间的关系,流程如下:

  1.创建连接Connection
  2.创建会话Session
  3.通过Session来创建其它的(MessageProducer、MessageConsumer、Destination、TextMessage)
  4.将生产者 MessageProducer 和消费者 MessageConsumer 都会指向目标 Destination
  5.生产者向目标发送TextMessage消息send()
  6.消费者设置监听器,监听消息。

  三:创建实例

  1.打开IDEA,创建一个ActiveMQ的maven项目,如下图:

  2.自己新创建两个java文件,appConsumer消费者类和appProducer生产者类,项目结构如下图:

  3.生产者代码如下:

 1 package com.liu.jms;
 2
 3 import org.apache.activemq.ActiveMQConnectionFactory;
 4
 5 import javax.jms.*;
 6
 7 public class appProducer {
 8
 9     private static final String url = "tcp://127.0.0.1:61616";//actvemq的服务器tcp连接方式
10     private static final String queueName = "queue-test";//定义队列的名称
11
12     public static void main(String[] args) throws  JMSException {
13         //1.创建connectionFactory
14         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
15         //2.创建connection
16         Connection connection = connectionFactory.createConnection();
17         //3.启动连接
18         connection.start();
19         //4.创建session
20         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
21         //5.创建destination
22         Destination destination = session.createQueue(queueName);
23         //6.创建生产者
24         MessageProducer producer = session.createProducer(destination);
25
26         for (int i = 0; i < 100; i++) {
27
28             TextMessage textMessage = session.createTextMessage("test" + i);
29             //7.发送消息
30             producer.send(textMessage);
31
32             System.out.println("发送消息" + textMessage.getText());
33
34         }
35         //8.关闭连接
36         connection.close();
37     }
38 }

  如代码所示,通过tcp方式连接了服务端,(别忘了启动服务端的服务)。链接的具体参数可以参考http://activemq.apache.org/connection-configuration-uri.html

然后创建了一个生产者,这个生产者绑定了一个以名为queueName的队列为目的源,代表着这个生产者的消息会发到这个消息队列上面去。然后通过for循环发送了一百个消息。

  4.消费者代码如下:

package com.liu.jms;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class appConsumer {

    private static final String url = "tcp://127.0.0.1:61616";
    private static final String queueName = "queue-test";

    public static void main(String[] args) throws  JMSException {
        //1.创建connectionFactory
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        //2.创建connection
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建destination
        Destination destination = session.createQueue(queueName);
        //6.创建消费者
        MessageConsumer consumer = session.createConsumer(destination);
        //7.创建一个监听器
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {

                TextMessage textMessage = (TextMessage)message;
                try {
                    System.out.println("接收到的消息:" + textMessage.getText());

                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //8.关闭连接(监听器是异步的还没有监听到消息的时候,就关闭连接了)
        //connection.close();
    }
}

  如代码所示:消费者和生产者不同的是,消费需要建立一个监听器,来监听以名为queueName的队列上是否有了消息,有消息就会接受,然后通过onMessage方法对消息进行处理。

  5.测试

  首先启动消费者这个java类,观察控制台,如下图:

  接着启动生产者的java类,观察控制台,如下图:生产了一百条消息。

  此时切换至消费的控制台,观察控制台,如下图:已经打印出了一百条消息了,说明消费者已经接受到全部一百条消息。

  6.打开activemq的控制台查看Queues:(http://127.0.0.1:8161/admin/queues.jsp)如下图所示:队列有一个名字是我们设置的queue-test,消费者也有一个就是我们创建的那个消费者类,队列中有一百条消息,被移除了一百条,也就是上面所说的,消费者接收到了这100条全部的消息。

  7.那么我启动了两个相同目标队列的消费者呢?重新测试一下看看,为方便看清结果,重启一下服务。然后运行两遍消费者,idea控制台如下图:有两个消费者,且都没有收到消息。

  8.启动生产者,如下图:生产了一百条信息。

  9.看看两个消费者的控制台,如下两张图:其中一个消费接收到的全是奇数的消息,而另一个接收到的都是偶数的消息。

  

  10.看看activemq控制台,如下图,消费者确实是两个。

  11.得出结论:当有点对点模式下,两个消费者消费的消息之和是生产者产生的消息总数,且每一个消息都只会被一个消费者接收,不会出现两个消费者接收同一消息的情况。

这一篇通过这个简单的小demo我们已经实现了点对点的通信方式,并了解它的特性。下一篇将会学习订阅发布模式。

原文地址:https://www.cnblogs.com/liuyuan1227/p/10740053.html

时间: 2024-11-07 08:08:30

学习ActiveMQ(二):点对点(队列)模式消息演示的相关文章

RabbitMQ(python实现)学习之二:Producer发送消息至多个消息队列queue(广播消息)

1.1本部分内容简介 这部分我们将要发送一个消息到多个Consumer,这部分称之为"publish/subscribe" 我们实现的方式就是发送端,发送一个消息,与此同时,多个接收端将同时接收到消息并打印在屏幕上面. 1.2exchange简介 在前面的博文中,我们的讲解是:发送端发送消息至消息队列,接收端从消息队列获取消息.现在我们来介绍一下rabbitmq的完整消息传送模型. >Producer:用来发送消息的应用程序 >queue:用来存储消息的缓存 >Con

OpenFire源码学习之二十五:消息回执与离线消息(下)

这一篇紧接着上面继续了. 方案二 基于redis的消息回执.主要流程分为下面几个步骤: 1)将消息暂存储与redis中,设置好消息的过期时间 2)客户端回执消息id来消灭暂存的消息 3)开通单独线程论坛在第1)步中的消息.根据消息的时间重新发送消息.如果消息第一次存放的时间大雨有效期(自定义10秒),解析消息中的to查找用户是否还在线.如果在则T掉(因为它长时间不理会服务的重要命令),如果不在线则将消息放置离线表. OK,先来看看消息的存储格式吧. 1.MESSAGE消息 用户集合 SADD 

OpenFire源码学习之二十三:关于消息的优化处理

消息处理 之前有说过,openfire的消息处理策略本人并不是很喜欢.先看下openfire上脱机消息策略. 个人认为消息关于会话的消息,用户的存储量应该无限大.服务器不应该被消息吃撑了.所谓聊天通讯,这一关很重要. Openfire的消息是什么流程呢. 1.当用户登陆连接的时候.握手.认证.绑定资源.获取花名册.获取离线消息. 2.服务端会查找关系型数据库.经本人测试离线消息在数据库表中达到100万条以上的时候,查询速度非常慢,甚至会导致openfire奔溃. ..... 那么openfire

vim学习(二)之模式

vim模式 基本上 vi/vim 共分为三种模式,分别是命令模式(Command mode),输入模式(Insert mode)和底线命令模式(Last line mode). 命令模式: 用户刚刚启动 vi/vim,便进入了命令模式. 此状态下敲击键盘动作会被Vim识别为命令,而非输入字符.比如我们此时按下i,并不会输入一个字符,i被当作了一个命令. 以下是常用的几个命令: i 切换到输入模式,以输入字符. x 删除当前光标所在处的字符. : 切换到底线命令模式,以在最底一行输入命令. 若想要

设计模式学习总结(二十三)--状态模式

定义 在很多情况下,一个对象的行为取决于它的一个或多个变化的属性,这些属性我们称之为状态,这个对象称之为状态对象.对于状态对象而已,它的行为依赖于它的状态,比如你要预订房间,那么只有当该房间为空闲时你才能预订,你想入住该房间也只有当你预订了该房间或者该房间为空闲时.对于这样的一个对象,当它在于外部事件产生互动的时候,其内部状态就会发生改变,从而使得他的行为也随之发生改变. 状态模式就是允许对象在内部状态发生改变时改变它的行为,看起来好像修改了它的类. 角色 Context: 环境类.可以包括一些

activeMQ队列模式和主题模式的Java实现

一.队列模式 生产者 import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activ

ActiveMQ_点对点队列(二)

一.本文章包含的内容 1.列举了ActiveMQ中通过Queue方式发送.消费队列的代码(普通文本.json/xml字符串.对象数据) 2.spring+activemq方式 二.配置信息 1.activemq的pom.xml信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <!--activemq  Begin-->        <dependency>            <groupId>org.springframew

使用Java编写ActiveMQ的队列模式和主题模式

队列模式的消息演示 本小节简单演示一下如何使用JMS接口规范连接ActiveMQ,首先创建一个Maven工程,在pom.xml文件中,添加activemq的依赖: <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version

学习ActiveMQ(六):JMS消息的确认与重发机制

当我们发送消息的时候,会出现发送失败的情况,此时我们需要用到activemq为我们提供了消息重发机制,进行消息的重新发送.那么我们怎么知道消息有没有发送失败呢?activemq还有消息确认机制,消费者在接收到消息的时候可以进行确认.本节将确认机制和重发机制一起在原有的代码中学习. 消息确认机制有四种:定义于在session对象中 AUTO_ACKNOWLEDGE= 1 :自动确认 CLIENT_ACKNOWLEDGE= 2:客户端手动确认 UPS_OK_ACKNOWLEDGE= 3: 自动批量确