ActiveMQ消息过滤

前言

ActiveMQ提供了一种机制,使用它,消息服务可根据消息选择器中的标准来执行消息过滤。生产者可在消息中放入应用程序特有的属性,而消费者可使用基于这些属性的选择标准来表明对消息是否感兴趣。这就简化了客户端的工作,并避免了向不需要这些消息的消费者传送消息的开销。然而,它也使得处理选择标准的消息服务增加了一些额外开销。

消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。可以将消息选择器作为MessageConsumer 创建的一部分。

实现对MapMessage和TextMessage两种消息的过滤条件的设置和消费

Producer

在消息的属性中设置过滤条件

package com.tgb.activemqFilter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
    // 单例模式
    // 1、连接工厂
    private ConnectionFactory connectionFactory;
    // 2、连接对象
    private Connection connection;
    // 3、Session对象
    private Session session;
    // 4、生产者
    private MessageProducer messageProducer;

    public Producer() {
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("admin",
                    "admin", "tcp://127.0.0.1:61616");
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            // 设置自动签收模式
            this.session = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.messageProducer = this.session.createProducer(null);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

    }

    public Session getSession() {
        return this.session;
    }

    public void send1(/* String QueueName, Message message */) {
        try {

            Destination destination = this.session.createQueue("first");
            MapMessage msg1 = this.session.createMapMessage();
            msg1.setString("name", "张三");
            msg1.setInt("age", 20);
            // 设置用于消息过滤器的条件
            msg1.setStringProperty("name", "张三");
            msg1.setIntProperty("age", 20);
            msg1.setStringProperty("color", "bule");

            MapMessage msg2 = this.session.createMapMessage();
            msg2.setString("name", "李四");
            msg2.setInt("age", 25);
            // 设置用于消息过滤器的条件
            msg2.setStringProperty("name", "李四");
            msg2.setIntProperty("age", 25);
            msg2.setStringProperty("color", "white");

            MapMessage msg3 = this.session.createMapMessage();
            msg3.setString("name", "赵六");
            msg3.setInt("age", 30);
            // 设置用于消息过滤器的条件
            msg3.setStringProperty("name", "赵六");
            msg3.setIntProperty("age", 30);
            msg3.setStringProperty("color", "black");
            // 发送消息
            this.messageProducer.send(destination, msg1,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            this.messageProducer.send(destination, msg2,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
            this.messageProducer.send(destination, msg3,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public void send2() {
        try {
            Destination destination = this.session.createQueue("first");
            TextMessage message = this.session.createTextMessage("我是一个字符串");
            message.setIntProperty("age", 25);
            // 发送消息
            this.messageProducer.send(destination, message,
                    DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

    }

    public static void main(String[] args) {
        Producer producer = new Producer();
        producer.send1();
        // producer.send2();

    }
}

Conmuser

消费消息时,直接在session创建MessageConsumer时,将过滤条件作为参数传入(过滤条件的写法和SQL的写法是很像的)

package com.tgb.activemqFilter;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Conmuser {
    // 单例模式
    // 1、连接工厂
    private ConnectionFactory connectionFactory;
    // 2、连接对象
    private Connection connection;
    // 3、Session对象
    private Session session;
    // 4、生产者
    private MessageConsumer messageConsumer;
    // 5、目的地址
    private Destination destination;
    // 消息选择器
    public final String SELECTOR_1 = "age > 25";
    public final String SELECTOR_2 = " age > 20 and color=‘black‘";

    public Conmuser() {
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("admin",
                    "admin", "tcp://127.0.0.1:61616");
            this.connection = connectionFactory.createConnection();
            this.connection.start();
            // 设置自动签收模式
            this.session = this.connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            this.destination = this.session.createQueue("first");
            // 在构造消费者的时候,指定了 消息选择器
            // 有选择性的消费消息
            this.messageConsumer = this.session.createConsumer(destination,
                    SELECTOR_1);
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public Session getSession() {
        return this.session;
    }

    // 用于监听消息队列的消息
    class MyLister implements MessageListener {

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    TextMessage ret = (TextMessage) message;
                    System.out.println("results;" + ret.getText());
                }
                if (message instanceof MapMessage) {
                    MapMessage ret = (MapMessage) message;
                    System.out.println(ret.toString());
                    System.out.println(ret.getString("name"));
                    System.out.println(ret.getInt("age"));
                }
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        }

    }

    // 用于异步监听消息
    public void receiver() {
        try {
            this.messageConsumer.setMessageListener(new MyLister());
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        Conmuser conmuser = new Conmuser();
        conmuser.receiver();

    }
}

时间: 2024-12-15 12:18:04

ActiveMQ消息过滤的相关文章

JMS学习四(ActiveMQ消息过滤)

消息的过期.消息的选择器和消息的优先级. 一.消息的过期 允许消息过期 .默认情况下,消息永不会过期.如果消息在特定周期内失去意义,那么可以设置过期时间. 有两种方法设置消息的过期时间,时间单位为毫秒: 1.使用消息生产者的setTimeToLive 方法为所有的消息设置过期时间.2.使用消息生产者的send 方法为每一条消息设置过期时间. 消息过期时间,send 方法中的 timeToLive 值加上发送时刻的 GMT 时间值.如果 timeToLive 值等于零,则 JMSExpiratio

ActiveMQ消息队列-单节点测试(点对点模式)

ActiveMQ发送和接收消息的过程和jdbc操作数据库很类似:首先创建Connection连接对象,再获取Session会话对象,之后通过Session会话对象创建Producer.Consumer.Message等对象,只不过ActiveMQ的Connection对象是通过ActiveMQConnectionFactory工厂产生的.以下是一些场景的测试代码. 先定义一些常量数据,这些数据在后面的例子中也有用到 // 用户名 private final static String USERN

消息队列如何利用标签实现消息过滤

场景介绍 一个消息队列(MQ)存储的消息,可以包含不同实际用途.如果这些消息不加区分,消费者每次消费都会按顺序拉取消息,直到完成对所有消息的消费.如果消费者只对某一类型的消息感兴趣,那么将所有消息都消费一遍必会影响消费者处理效率. 解决方案 分布式消息服务DMS是稳定可靠的消息队列服务,提供普通队列.有序队列.Kafka.ActiveMQ.RabbitMQ,兼容HTTP.TCP.AMQP协议,应用于系统解耦.异步通信.流量削峰去谷.第三方集成等场景.DMS提供消息标签的能力,支持生产者为每条消息

2015年12月10日 spring初级知识讲解(三)Spring消息之activeMQ消息队列

基础 JMS消息 一.下载ActiveMQ并安装 地址:http://activemq.apache.org/ 最新版本:5.13.0 下载完后解压缩到本地硬盘中,解压目录中activemq-core-5.13.0.jar,这就是ActiveMQ提供给我们的API. 在bin目录中,找到用于启动ActiveMQ的脚本,运行脚本后ActiveMQ就准备好了,可以使用它进行消息代理. 访问http://127.0.0.1:8161/admin/能看到如下则表示安装成功了. 二.在Spring中搭建消

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac

Windows消息过滤

在C#编程中,经常会遇到一些场景,如禁止鼠标拖动窗体,启用某些快捷键,禁止鼠标移动等.遇到这些需求,可以通过窗体的MouseMove事件,OnDragDrop,OnMove等事件来解决问题, 但是该方法有个缺点是,只能在当前窗体或控件上起作用,如果窗体或控件被覆盖,就不起作用了.而我们在开发时经常会碰到一个Form上有很多控件的情形,本节将讲述如何通过捕捉windows消息的方式来实现这个功能. 一般来讲,实现该功能有两种方法, 1. 通过重写WndProc(ref Message m)来实现,

消息过滤

消息过滤的背景 从一个特定的队列或主题接收消息时,您可能会希望由更多的选择.如果没有消息过滤技术,主题订阅者就会接收发布到该主题的每一条消息,而队列接收者也会继续接收下一条消息,并不考虑这些消息的内容或类型. 就主题订阅者来说,它可能要强制处理很多不必要和不想要的消息,这通常导致:编写定制Java代码来人工过滤不想要的消息. 针对队列的消息过滤更有趣,因为它和主题不同:一旦一条消息被一个队列接收者消费以后,对其他所有接收者来说,该队列就不再可用.这意味着,如果一个队列接收者消费了一条消息,并决定

JAVA的设计模式之观察者模式----结合ActiveMQ消息队列说明

1----------------------观察者模式------------------------------ 观察者模式:定义对象间一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知自动更新. activeMQ消息队列 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮

ActiveMQ 消息服务(三)

想象场景: 有一条任务,需要在特定环境下进行.用ActiveMQ 来讲分两步,第一:发送者发布一条消息:第二:接收者接收到这条消息后需要干某些事情. 本文依然直接贴出demo代码! 1.项目结构图: 2.activeMQ的jar包依赖,部分pom.xml文件代码: <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core