JMS学习四(ActiveMQ消息过滤)

消息的过期、消息的选择器和消息的优先级。

一、消息的过期

允许消息过期 。默认情况下,消息永不会过期。如果消息在特定周期内失去意义,那么可以设置过期时间。 
有两种方法设置消息的过期时间,时间单位为毫秒: 
1.使用消息生产者的setTimeToLive 方法为所有的消息设置过期时间。
2.使用消息生产者的send 方法为每一条消息设置过期时间。
 消息过期时间,send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值。如果 timeToLive 值等于零,则 JMSExpiration 被设为零, 表示该消息永不过期。

3、消息服务器接收到消息后,在指定的时间后,会从队列中移除指定的消息,超时被移除的消息不会发送给消费者。

4、使用消息生产者的setTimeToLive(long time ) 方法来给所有的消息设置过期时间:

// 消息生产者
MessageProducer producer = null;
producer = session.createProducer(queue);
// 消息是否为持久性的,如果不设置默认是持久化的。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
 //消息过期设置
producer.setTimeToLive(1000);  

5、使用消息生产者的send()方法来设置消息的过期时间

//message发送的消息,deliveryMode是否持久化,priority优先级,timeToLive 消息过期时间
//producer.send(message, deliveryMode, priority, timeToLive);
producer.send(message, DeliveryMode.PERSISTENT, 4, 1000); 

这里在插一段吧,上面设置消息过期的都是消息生产者这方的来设置的,也就是如果不满足条件则消息服务器会把消息从消息队列中删除,但是我们也可以在消息消费端来设置接受时间(仅限于同步接受)

Message message = consumer.receive(2);  

就是在接受的时候添加等待时间(单位是毫秒)如果在指定的时间内获取不到消息则不会再等了。如果不设置等待时间则一直等待直到接收到消息或超时为止。

二、消息的选择器

不管是在消息发送端设置消息过期时间还是在接收端设置等待时间,都是对不满足的消息有过滤的作用,那消息选择器就是为过滤消息而生的下面来看看消息选择器:

ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。 消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为MessageConsumer 创建的一部分。

1、消息生产者:

package mqtest3;  

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;  

import org.apache.activemq.ActiveMQConnectionFactory;  

public class Producer {
    // 单例模式
    // 1、连接工厂
    private ConnectionFactory connectionFactory;
    // 2、连接对象
    private Connection connection;
    // 3、Session对象
    private Session session;
    // 4、生产者
    private MessageProducer messageProducer;  

    public Producer() {
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("admin",
                    "admin", "tcp://127.0.0.1:61616");
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            // 设置自动签收模式
            this.session = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.messageProducer = this.session.createProducer(null);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }  

    }  

    public Session getSession() {
        return this.session;
    }  

    public void send1(/* String QueueName, Message message */) {
        try {  

            Destination destination = this.session.createQueue("first");
            MapMessage msg1 = this.session.createMapMessage();
            msg1.setString("name", "张三");
            msg1.setInt("age", 20);
            // 设置用于消息过滤器的条件
            msg1.setStringProperty("name", "张三");
            msg1.setIntProperty("age", 20);
            msg1.setStringProperty("color", "bule");  

            MapMessage msg2 = this.session.createMapMessage();
            msg2.setString("name", "李四");
            msg2.setInt("age", 25);
            // 设置用于消息过滤器的条件
            msg2.setStringProperty("name", "李四");
            msg2.setIntProperty("age", 25);
            msg2.setStringProperty("color", "white");  

            MapMessage msg3 = this.session.createMapMessage();
            msg3.setString("name", "赵六");
            msg3.setInt("age", 30);
            // 设置用于消息过滤器的条件
            msg3.setStringProperty("name", "赵六");
            msg3.setIntProperty("age", 30);
            msg3.setStringProperty("color", "black");
            // 发送消息
            this.messageProducer.send(destination, msg1,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            this.messageProducer.send(destination, msg2,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            this.messageProducer.send(destination, msg3,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }  

    public void send2() {
        try {
            Destination destination = this.session.createQueue("first");
            TextMessage message = this.session.createTextMessage("我是一个字符串");
            message.setIntProperty("age", 25);
            // 发送消息
            this.messageProducer.send(destination, message,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }  

    }  

    public static void main(String[] args) {
        Producer producer = new Producer();
        producer.send1();
        // producer.send2();  

    }
}  

2、消息消费者:

package mqtest3;  

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;  

public class Conmuser {
    // 单例模式
    // 1、连接工厂
    private ConnectionFactory connectionFactory;
    // 2、连接对象
    private Connection connection;
    // 3、Session对象
    private Session session;
    // 4、生产者
    private MessageConsumer messageConsumer;
    // 5、目的地址
    private Destination destination;
    // 消息选择器
    public final String SELECTOR_1 = "age > 25";
    public final String SELECTOR_2 = " age > 20 and color=‘black‘";  

    public Conmuser() {
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("admin",
                    "admin", "tcp://127.0.0.1:61616");
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            // 设置自动签收模式
            this.session = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.destination = this.session.createQueue("first");
            // 在构造消费者的时候,指定了 消息选择器
            // 有选择性的消费消息
            this.messageConsumer = this.session.createConsumer(destination,
                    SELECTOR_1);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }  

    public Session getSession() {
        return this.session;
    }  

    // 用于监听消息队列的消息
    class MyLister implements MessageListener {  

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    TextMessage ret = (TextMessage) message;
                    System.out.println("results;" + ret.getText());
                }
                if (message instanceof MapMessage) {
                    MapMessage ret = (MapMessage) message;
                    System.out.println(ret.toString());
                    System.out.println(ret.getString("name"));
                    System.out.println(ret.getInt("age"));
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }  

    }  

    // 用于异步监听消息
    public void receiver() {
        try {
            this.messageConsumer.setMessageListener(new MyLister());
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }  

    public static void main(String[] args) {
        Conmuser conmuser = new Conmuser();
        conmuser.receiver();  

    }
}  

上面的demo是对MapMessage和TextMessage两种消息的过滤条件的设置和消费,过滤条件的设置使在消息的属性中设置,而消费消息的时候直接是在session创建MessageConsumer时传入的参数即过滤条件(过滤条件的写法和SQL的写法是很像的)

在写过滤条件的时候要注意设置的是什么类型的条件即: int 、string 如果是int 则加引号而如果是String则要加哦!!!

三、消息的优先级

通常,可以确保将单个会话向目标发送的所有消息按其发送顺序传送至消费者。然而,如果为这些消息分配了不同的优先级,消息传送系统将首先尝试传送优先级较高的消息。 
有两种方法设置消息的优先级: 
1.使用 setPriority 方法,这样所有的消息都采用此传送模式; 
2.使用 send 方法为每一条消息设置传送模式; 
消息优先级从 0-9 十个级别,0-4 是普通消息,5-9 是加急消息。如果不指定优先级,则默认为 4。JMS 不要求严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。

原文地址:https://www.cnblogs.com/alter888/p/8975356.html

时间: 2024-08-03 12:52:31

JMS学习四(ActiveMQ消息过滤)的相关文章

JMS学习(七)-ActiveMQ消息的持久存储方式之KahaDB存储

一,介绍 自ActiveMQ5.4以来,KahaDB成为了ActiveMQ默认的持久化存储方式.相比于原来的AMQ存储方式,官方宣称KahaDB使用了更少的文件描述符,并且提供了更快的存储恢复机制. 二,KahaDB存储配置 在 conf/activemq.xml 中配置如下: <broker brokerName="broker" ... > <persistenceAdapter> <kahaDB directory="activemq-da

JMS学习(八)-ActiveMQ Consumer 使用 push 还是 pull 获取消息

ActiveMQ是一个消息中间件,对于消费者而言有两种方式从消息中间件获取消息: ①Push方式:由消息中间件主动地将消息推送给消费者:②Pull方式:由消费者主动向消息中间件拉取消息.看一段官网对Push方式的解释: To be able to achieve high performance it is important to stream messages to consumers as fast as possible so that the consumer always has a

ActiveMQ消息过滤

前言 ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤.生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣.这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销.然而,它也使得处理选择标准的消息服务增加了一些额外开销. 消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息.消息选择器是一些字符串,它们基于某

JMS学习(五)--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系

一,消息的持久化和非持久化 ①DeliveryMode 这是传输模式.ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent delivery),默认情况下使用的是持久传输. 可以通过MessageProducer 类的 setDeliveryMode方法设置传输模式: MessageProducer producer = ...; producer.setDeliveryMode(DeliveryMode.PERSISTENT); 持

JMS学习(六)-ActiveMQ的高可用性实现

一,ActiveMQ高可用性的架构 ActiveMQ的高可用性架构是基于Master/Slave 模型的.ActiveMQ总共提供了四种配置方案来配置HA,其中Shared Nothing Master/Slave 在5.8版本之后不再使用了,并在ActiveMQ5.9版本中引入了基于Zookeeper的Replicated LevelDB Store HA方案. 二,Master/Slave架构的配置解释 ①Shared Nothing Master/Slave   该架构最大的特点是: 1)

JavaWeb-过滤器Filter学习(四)敏感词过滤实例

通过Filter来实现留言板的敏感词过滤- 思路很简单,我们这里的敏感词是直接先放进去的,实际项目中,肯定是存在数据库中.在Filter 过滤器中,我们先拿到用户提交的留言,如果出现了敏感词,我们就用*号来替换. 代码演示: index.jsp: <%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%> <%@taglib uri="htt

JMS学习(四) Selector详解

一.前言 在掌握了消息的结构之后,我们接下来看一下JMS的一个重要功能:选择器.有些时候,作为消费者只希望处理自己感兴趣的消息.如果某个消息只有一个消费者,我们可以在让该客户端根据规则来处理自己感兴趣的消息,那些不满足某些规则的就直接替丢弃掉. 但如果消息是广播的机制,那么让每个客户端都去做这样的处理,就加大了客户端的工作量,一种更好的方式,就是由消息提供者来完成消息的过滤和路由工作,这样就能大减轻客户端的工作量,消费者在真正处理的时候,完全不用关注对消息的过滤,可以只负责对消息的处理.也就是说

jQuery学习之路(四)之过滤选择器

今天是第四课主要学习过滤选择器 <!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>lesson4 过滤选择器</title> <link rel="stylesheet" type="text/css" href="css/lesson.css" ></link>

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac