ActiveMQ学习第八篇:Messaage

Messaage Properties:

??ActiveMQ支持很多消息属性,具体可以参考 http://activemq.apache.org/activemq-message-properties.html
??常见得一些属性说明:

  1. queue得消息默认是持久化得
  2. 消息得优先级默认是4.
  3. 消息发送时设置了时间戳。
  4. 消息的过期时间默认是永不过期,过期的消息进入DLQ,可以配置DLQ及其处理策略。
  5. 如果消息是重发的,将会被标记出来。
  6. JMSReplyTo标识响应消息发送到哪个queue.
  7. JMSCorelationID标识此消息相关联的消息id,可以用这个标识把多个消息连接起来。
  8. JMS同时也记录了消息重发的次数。默认是6次
  9. 如果有一组相关联的消息需要处理,可以分组;只需要设置消息组的名字和这个消息的第几个消息。
  10. 如果消息中一个事务环境,则TXID将会被设置。
  11. 此外ActiveMQ在服务器端额外设置了消息入队和出队的时间戳。
  12. ActiveMQ里消息属性的值,不仅可以用基本类型,还可以用List或Map类型

    Advisory Message:

    ??Advisory Message是ActiveMQ自身的系统消息地址,可以监听该地址来获取activemq的系统消息。目前支持获取如下信息:

  13. consumers, producers和connections的启动和停止
  14. 创建和销毁temporary destinations
  15. toppics 和queues 的消息过期
  16. brokers发送消息给destination,但是没有consumers
  17. connections启动和停止
    ??说明:
  18. 所有advisory的topic,前缀是:ActiveMQ.Advisory
  19. 所有Advisory的消息类型是:‘Advisory’,所有的Advisory都有的消息属性有:originBrokerId,originBrokerName,originBrokerURL
  20. 具体支持的topic和queue,请参照:
      http://activemq.apache.org/advisory-message.html
    Advisory功能默认是关闭的,打开Advisorie的具体实现如下:
<destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" advisoryForConsumed="true"/>
        </policyEntries>
            </policyMap>
        </destinationPolicy>

开启之后启动ActiveMQ
查看控制台:
已经可以看到以ActiveMQ.Advisory为前缀的topic了,

监听ActiveMQ.Advisory.Producer.Queue.my-queue实现如下:

package com.wangx.activemq;

import com.wangx.activemq.util.MQUtil;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMessage;

import javax.jms.*;

public class MessageReceiver {

    /**
     * topic名字
     */
    private static final String QUEUENAME = "ActiveMQ.Advisory.Producer.Queue.my-queue";

    public void receive() throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //获取Session
        //创建队列
        Topic queue = session.createTopic(QUEUENAME);
        //创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //监听生产者信息
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //将类型转换成ActiveMQMessage
                ActiveMQMessage activeMQMessage = (ActiveMQMessage)message;
                try {
                    //打印message
                    System.out.println(activeMQMessage.getMessage());
                    session.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public static void main(String[] args) throws JMSException {
        MessageReceiver messageReceiver = new MessageReceiver();
        messageReceiver.receive();
    }
}

Advisory的使用方式:

  1. 要在配置文件里面开启Advisories.
  2. 消息发送端没什么变化,不做多余改变或配置,
  3. 消息接收端:
    ??1)根据你要接收的消息类型,来设置不同的topic,当然也可以使用AdvisorySupport这个类来辅助创建,比如你想要得到消息生产者的信息,你可以:
Topic queue = session.createTopic(QUEUENAME);
        Destination destination = AdvisorySupport.getProducerAdvisoryTopic(queue);

??2)由于这个topic默认不是持久化的,所有要先看起接收端,然后再发送消息。
??3) 接收消息的时候,接收到的消息类型是ActiveMQMessage,所以需要先转换成ActiveMQMessage,然后再通过getDataStructure方法来得到具体的信息对象。
代码如下:

 //将类型转换成ActiveMQMessage
                ActiveMQMessage activeMQMessage = (ActiveMQMessage)message;
                try {
                    //打印message
                    System.out.println(activeMQMessage.getDataStructure());
                    session.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }

 这样可以可以拿到相关信息,

Delay and schedule Message Delivery (延迟和定时消息传递)

??有时候我们不希望消息马上被broker投递出去,而是想要消息60s以后发送给消费者,或者是我们想要让消息每隔一段时间投递一次,一共投递指定的次数。。。类似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。
??我们只需要把几个描述消息定时调度的方式参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。当然需要再xml broker中配置schedulerSupport属性为true,
一共四个属性:
    AMQ_SCHEDULED_DELAY: 延迟投递的时间
    AMQ_SCHEDULED_PERIOD: 重复投递的时间间隔
    AMQ_SCHEDULED_REPEAT:重复投递次数
    AMQ_SCHEDULED_CRON: Cron表达式
  ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,可以使用这个类来辅助设置,使用例子如下:延迟60s
  在broker上设置schedulerSupport="true",然后使用如下代码设置:

TextMessage textMessage = session.createTextMessage("message" + i);
            long time = 30 * 1000;
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);

??这样,当生产者发送消息之后,消费者不会马上收到消息,而是会等待30s之后才会开始接收消息
  延迟10s,投递3次,间隔5秒的例子

 TextMessage textMessage = session.createTextMessage("message" + i);
            long delay = 10 * 1000;
            long period= 5 * 1000;
            int repeat = 3;
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
            messageProducer.send(textMessage);

使用CRON表达式,每个小时发送一次

textMessage.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON,"0 * * * *");

??CRON表达式的优先级高于另外三个参数,如果在设置了延时时间,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔period,就是说设置的是叠加效果,例如每小时都会发生消息被投递10次,延迟0秒开始,每次间隔1秒。

Message Transformation

??有时候需要JMS Producer内部进行message转换,从4.2版本起,ActiveMQ提供了一个Message Transform接口用于进行消息转换,可以在如下对象上调用:

ActiveMQConnectionFactory,ActiveMQConnection,ActiveMQSession,ActiveMQMessageConsumer,ActiveMQMessageProducer.

??在消息被发送之前发送到JMS producer的消息总线前进行转换,通过producerTransform方法,在消息到达总线后,但是在consumer接收消息之前进行转换,通过consumerTransform方法,当然MessageTransfoemer接口的实现需要你自己来提供。

原文地址:https://www.cnblogs.com/yangk1996/p/11667702.html

时间: 2024-11-02 22:07:15

ActiveMQ学习第八篇:Messaage的相关文章

从.Net到Java学习第八篇——SpringBoot实现session共享和国际化

SpringBoot Session共享 修改pom.xml添加依赖 <!--spring session--> <dependency> <groupId>org.springframework.session</groupId> <artifactId>spring-session-data-redis</artifactId> </dependency> 添加配置类RedisSessionConfig @Config

SpringCloud学习第八篇:Stream学习(Greenwich.SR1版本)

一.Stream简介 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互.所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式.通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动.Spring Cloud Stream 为一些供应商的

ActiveMQ学习第六篇:Destination的特性

Wildcards(通配符) Wildcars用来支持名字分层体系,它不是JMS规范的一部分,是ActiveMQ的扩展. ??ActiveMQ支持以下三种通配符: ".":用于作为路径上名字间的分隔符 ">":用于递归的匹配任何以这个名字开始的Destination(目的地) "*":用于作为路径上任何名字. ??举例来说,如有以下两个Destination: ??PRICE.COMPUTER.JD.APPLE(苹果电脑在京东上的价格) ?

ActiveMQ学习第五篇:ActiveMq伪集群学习

启动多实例 # 1.将conf文件夹复制一份 cp -r conf/ conf-1/ #主要是修改conf-1目录activemq.xml # 2.修改Broker名称 <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost-1" dataDirectory="${activemq.data}"> #3.数据存储如果使用的是kahaDB,

ActiveMQ学习第七篇:Message Dispatch(消息发送)的特性

Message Cursors ??ActiveMQ发送持久消息的典型处理方式是:当消息的消费者准备就绪时,消息发送系统把存储的消息按批次发送给消费者,在发送完一个批次的消息后,指针的标记位置指向下-批次待发送消息的位置,进行后续的发送操作.这是一种比较健壮和灵活的消息发送方式,但大多数情况下,消息的消费者不是一直处于这种理想的活跃状态. ??因此,从ActiveMQ5. 0. 0版本开始,消息发送系统采用一种混合型的发送模式,当消息消费者处理活跃状态时,允许消息发送系统直接把持久消息发送给消费

python 学习 第八篇 jquery

简介: jQuery是一个javascript库.极大滴简化了javascript编程. 包含内容: HTML 元素选取 HTML 元素操作 CSS 操作 HTML 事件函数 JavaScript 特效和动画 HTML DOM 遍历和修改 AJAX 1:下载 jQuery 共有两个版本的 jQuery 可供下载:一份是生产版本jQuery.min.js(最小化和压缩过的),另一份是开发版jQuery.js(未压缩的供调试或阅读). 这两个版本都可http://jquery.com/downloa

iOS学习第八篇 ——NSString的使用

IOS字符串的常用方法和使用 NSString 1. NSString的四中创建方法 (1) NSString *str1 = @"方式一"; (2) NSString *str2 = [ [NSString alloc] initWithString:@"方式二"]; (3) NSString *str3 = [ NSString stringWithFormat:@"%@",@"方式三"]; (4) NSString *s

Linux学习第八篇之文件搜索命令find

一.find命令:(Windows搜索小工具推荐——Everything) 命令名称:find 命令所在路径:/bin/find 执行权限:所有用户 语法:find [搜索范围] [匹配条件] 功能描述:文件搜索 二.find命令的例子: 1.find /etc -name init 在目录/etc中查找文件init(会在子目录下的文件继续搜索init),-name 搜索条件的选项,文件名是全匹配的,模糊搜索可用通配符处理,如find /etc -name *init*,如果报find: pat

学习java随笔第八篇:封装、继承、多态

java和c#一样都是面向对象的语言. 面向对象的语言有三大特征:封装.继承.多态 封装 封装:隐藏对象的属性和实现细节,仅对外公开接口,控制在程序中属性的读和修改的访问级别. class Person2 { private String name; public void setName(String name) { this.name=name; } public String getName() { return name; } private String sex; public voi