ActiveMQ笔记之点对点队列(Point-to-Point)

1. 点对点通信

点对点是一种一对一通信方式,更像是有一个队列,一个人往队列里放消息,另一个人从队列中取消息,其最大的特点是一个消息只会被消费一次,即使有多个消费者同时消费,他们消费的也是不同的消息。

2. 简单实现

添加依赖

添加Maven依赖:

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.15.2</version>
</dependency>

activemq.properties

在resource下创建一个activemq.properties,用来保存activemq的用户名、密码、连接地址等信息:

username = root
passwd = toor
url = tcp://47.96.17.190:61616

ActiveMqUtils

创建一个工具类,用来获取连接,因为工厂类一般都是比较重量级的类,不应该重复创建:

package org.cc11001100.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.Properties;

/**
 * @author: CC11001100
 * @date: 2017/11/8 18:20
 * @email: [email protected]
 */
public class ActiveMqUtils {

    private static ConnectionFactory connectionFactory;

    static{
        try {
            Properties properties = new Properties();
            properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream("activemq.properties"));
            connectionFactory=new ActiveMQConnectionFactory(properties.getProperty("username"),
                    properties.getProperty("passwd"),
                    properties.getProperty("url"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取JMS连接
     *
     * @return JMS Connection
     */
    public static Connection getConnection(){
        try {
            return connectionFactory.createConnection();
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return null;
    }

}

SenderUtils

创建发送消息的工具类:

package org.cc11001100.activemq;

import javax.jms.*;
import java.util.function.Function;

/**
 * @author: CC11001100
 * @date: 2017/11/8 18:12
 * @email: [email protected]
 */
public class SenderUtils {

    /**
     * 向指定的队列发送消息
     *
     * @param queueName 发送到哪个队列
     * @param generateMessage 使用这个方法产生要发送的消息
     */
    public static void send(String queueName, Function<Session, Message> generateMessage){

        Connection conn=null;
        Session session=null;
        MessageProducer messageProducer=null;

        try {
            conn = ActiveMqUtils.getConnection();
            assert conn != null;
            conn.start();
            session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

            /*队列名是区分大小写的,如果不存在的话会自动创建一个*/
            Queue queue=session.createQueue(queueName);
            messageProducer=session.createProducer(queue);
            /*设置非持久化,持久化的意思是要求发送的时候接收方要在线*/
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

            // 生成消息并发送
            Message message = generateMessage.apply(session);
            messageProducer.send(message);

            /*在提交的时候消息才会真正的发出去*/
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally{

            if(messageProducer!=null){
                try {
                    messageProducer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if(session!=null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if(conn!=null){
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

        }

    }

}

注意:在session.commit()之前消息是不会被发送出去的。

ReceiverUtils

创建接收消息的工具类:

package org.cc11001100.activemq;

import javax.jms.*;
import java.util.function.Function;

/**
 * @author: CC11001100
 * @date: 2017/11/8 18:37
 * @email: [email protected]
 */
public class ReceiverUtils {

    /**
     * 从指定队列中接收一个消息
     *
     * @param queueName 队列名称
     * @return 接收到的消息内容
     */
    public static Message receive(String queueName){

        Connection conn=null;
        Session session=null;
        MessageConsumer messageConsumer=null;

        try {
            conn=ActiveMqUtils.getConnection();
            assert conn != null;
            conn.start();
            session=conn.createSession(true,Session.AUTO_ACKNOWLEDGE);

            Queue queue=session.createQueue(queueName);
            messageConsumer=session.createConsumer(queue);

            /*这是一个阻塞式的方法,在接收到消息之前会一直阻塞着*/
            Message message=messageConsumer.receive();
            session.commit();
            return message;
        } catch (JMSException e) {
            e.printStackTrace();
        }finally{

            if(messageConsumer!=null){
                try {
                    messageConsumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if(session!=null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if(conn!=null){
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

        }

        return null;
    }

    /**
     * 从指定队列接收一个消息并将它传递给回调方法处理,返回处理后的结果
     *
     * @param queueName 队列名称
     * @param callback 处理消息的回调方法
     * @param <T> 处理消息后的返回值
     * @return 处理消息后的返回值
     */
    public static <T> T receive(String queueName, Function<Message, T> callback){
        Message message = receive(queueName);
        assert message!=null;
        return callback.apply(message);
    }

}

Main

创建测试类:

package org.cc11001100.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
 * @author: CC11001100
 * @date: 2017/11/8 18:49
 * @email: [email protected]
 */
public class Main {

    public static void main(String[] args) {

        final String QUEUE_NAME = "FOO_QUEUE";

        // 生产者
        new Thread(()->{

            while(true){

                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                SenderUtils.send(QUEUE_NAME, session -> {
                    try {
                        return session.createTextMessage(Long.toString(System.currentTimeMillis()));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    return null;
                });
            }

        }).start();

        // 消费者
        new Thread(()->{

            while(true){
                ReceiverUtils.receive(QUEUE_NAME, message->{
                    if(message instanceof TextMessage){
                        try {
                            TextMessage textMessage = (TextMessage) message;
                            System.out.println(textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                    return message;
                });
            }

        }).start();

    }

}
时间: 2024-08-30 01:32:04

ActiveMQ笔记之点对点队列(Point-to-Point)的相关文章

实战Spring4+ActiveMQ整合实现消息队列(生产者+消费者)

引言: 最近公司做了一个以信息安全为主的项目,其中有一个业务需求就是,项目定时监控操作用户的行为,对于一些违规操作严重的行为,以发送邮件(FoxMail)的形式进行邮件告警,可能是多人,也可能是一个人,第一次是以单人的形式,,直接在业务层需要告警的地方发送邮件即可,可是后边需求变更了,对于某些告警邮件可能会发送多人,这其中可能就会有阻塞发邮件的可能,直到把所有邮件发送完毕后再继续做下边的业务,领导说这样会影响用户体验,发邮件的时候用户一直处于等待状态,不能干别的事情.最后研究说用消息队列,当有需

相克军_Oracle体系_随堂笔记009-检查点队列

1.检查点队列 checkpoint queue RBA 日志块地址 redo block address LRBA 第一次被脏的地址 HRBA 最近一次被脏的地址 on disk rba 重做日志(current redo log)中最后一条日志的地址 数据块里有两个地址,LRBA,HRBA. checkpoint queue 按照脏块第一次脏的时间链起来. checkpoint queue就是按照数据块的LRBA地址链起来的. 2.CKPT进程    每隔3秒钟触发一次        记录检

算法笔记2-优先队列(堆)(上)

一.什么是优先队列? 看一情景:我们去KTV唱歌,点歌的时候,可以发现所点的歌就是一个队列. 这时候,一个MM突然不玩手机了想唱歌,于是她来点歌,并且想尽早轮到她. 于是她可以选择"插歌"这个功能插到前排队列里. 这种具备可以插入优先权元素的队列,就叫优先队列.但是,这个定义不是严谨的. 优先队列的基本模型是这样的-- 具备两个功能: insert插入: deleteMin 删除最小者. 它的工作就是-- 它很有用哦,具体可以用在操作系统,外部排序和贪婪算法中等. 二.怎么实现优先队列

学习ActiveMQ(二):点对点(队列)模式消息演示

一:介绍 点对点的消息发送方式主要建立在 消息(Message ),队列(Queue),发送者(Sender),消费者(receiver)上,Queue 存贮消息,Sender 发送消息,receive接收消息.具体点就是Sender Client通过Queue发送message ,而 receiver Client从Queue中接收消息.消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行. 二:通过jms编码接口之间的关

ActiveMQ_点对点队列(二)

一.本文章包含的内容 1.列举了ActiveMQ中通过Queue方式发送.消费队列的代码(普通文本.json/xml字符串.对象数据) 2.spring+activemq方式 二.配置信息 1.activemq的pom.xml信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <!--activemq  Begin-->        <dependency>            <groupId>org.springframew

ActiveMq C#客户端 消息队列的使用(存和取)

1.准备工具 VS2013Apache.NMS.ActiveMQ-1.7.2-bin.zipapache-activemq-5.14.0-bin.zip 2.开始项目 VS2013新建一个C#控制台应用程序,项目中添加两个dll引用,一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\lib\Apache.NMS\net-4.0目录下的Apache.NMS.dll,另一个是D:\Apache.NMS.ActiveMQ-1.7.2-bin\build\net-4.0\debug

activeMQ使用入门(点对点消息)

首先创建一个maven工程,在pom文件中增加相关的依赖包,如下: <dependency>    <groupId>javax.jms</groupId>    <artifactId>jms-api</artifactId>    <version>1.1-rev-1</version>    </dependency>    <dependency>    <groupId>org

ActiveMQ的(点对点&amp;发布/订阅通信模式)和(持久化方式)

ActiveMQ的持久化 消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试.消

ActiveMq笔记2-消息持久化

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制. ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB, 无论使用哪种持久化方式,消息的存储逻辑都是一致的.也就是说发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试.消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去. 1.Ka