ActiveMQ_点对点队列(二)

一、本文章包含的内容

1、列举了ActiveMQ中通过Queue方式发送、消费队列的代码(普通文本、json/xml字符串、对象数据)

2、spring+activemq方式

二、配置信息

1、activemq的pom.xml信息


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

<!--activemq  Begin-->

       <dependency>

           <groupId>org.springframework</groupId>

           <artifactId>spring-jms</artifactId>

           <version>${spring.version}</version>

       </dependency>

       <!-- <dependency>

            <groupId>org.springframework</groupId>

            <artifactId>spring-messaging</artifactId>

            <version>${spring.version}</version>

        </dependency>-->

       <dependency>

           <groupId>org.apache.activemq</groupId>

           <artifactId>activemq-all</artifactId>

           <version>5.14.0</version>

       </dependency>

       <!--activemq  End-->

2、activemq的配置文件:spring-jms.xml


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

<!-- 启用spring mvc 注解 -->

   <context:component-scan base-package="org.soa.test.activemq"/>

   <!-- 配置JMS连接工厂 -->

   <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

       <property name="brokerURL" value="failover:(tcp://192.168.146.129:61616)" />

       <!--解决接收消息抛出异常:javax.jms.JMSException: Failed to build body from content. Serializable class not available to broke-->

       <property name="trustAllPackages" value="true"/>

       <!-- 是否异步发送 -->

       <property name="useAsyncSend" value="true" />

   </bean>

   <!--   Queue模式 Begin -->

   <!-- 定义消息队列(Queue) -->

   <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">

       <!-- 设置消息队列的名字 -->

       <constructor-arg>

           <value>defaultQueueName</value>

       </constructor-arg>

   </bean>

   <!-- 配置JMS模板,Spring提供的JMS工具类,它发送、接收消息。(Queue) -->

   <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

       <property name="connectionFactory" ref="connectionFactory" />

       <property name="defaultDestination" ref="queueDestination" />

       <property name="pubSubDomain" value="false"/>

       <!--接收超时时间-->

       <!--<property name="receiveTimeout" value="10000" />-->

   </bean>

   <!--   Queue模式 End -->

三、队列发送端及测试程序

1、发送代码


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

package org.soa.test.activemq.queues;

import org.soa.test.activemq.StudentInfo;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.core.MessageCreator;

import org.springframework.stereotype.Component;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import java.util.List;

/**

 * Created by JamesC on 16-9-22.

 */

@Component

public class ProduceMsg {

    @Autowired

    private JmsTemplate jmsTemplate;

    /**

     * 向指定队列发送消息

     */

    public void sendMessage(Destination destination, final String msg) {

        System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg);

        jmsTemplate.send(destination, new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {

                return session.createTextMessage(msg);

            }

        });

    }

    /**

     * 向默认队列发送消息(默认队列名称在bean:queueDestination配置)

     */

    public void sendMessage(final String msg) {

        //queue://queue1

        String destination = jmsTemplate.getDefaultDestination().toString();

        System.out.println("向队列" + destination + "发送了消息------------" + msg);

        jmsTemplate.send(new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {

                return session.createTextMessage(msg);

            }

        });

    }

    /**

     * 向默认队列发送消息

     */

    public void sendMessageConvertAndSend(final Object msg) {

        String destination = jmsTemplate.getDefaultDestination().toString();

        System.out.println("向队列" + destination + "发送了消息------------" + msg);

        //使用内嵌的MessageConverter进行数据类型转换,包括xml(JAXB)、json(Jackson)、普通文本、字节数组

        jmsTemplate.convertAndSend(destination, msg);

    }

    /**

     * 向指定队列发送消息

     */

    public void sendStudentInfo(Destination destination, final StudentInfo msg) {

        System.out.println("向队列" + destination.toString() + "发送了消息------------" + msg);

        jmsTemplate.send(destination, new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {

                return session.createObjectMessage(msg);

            }

        });

    }

}

2、测试程序


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

package org.soa.test.activemq.queues;

import com.alibaba.fastjson.JSON;

import org.apache.activemq.command.ActiveMQQueue;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.soa.test.activemq.StudentInfo;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.ApplicationContext;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.Destination;

import java.util.Date;

//http://www.coderli.com/junit-spring-test-applicationcontext/

/**

 * Created by JamesC on 16-9-22.

 */

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration("/spring-jms.xml")

public class ProduceMsgTest extends AbstractJUnit4SpringContextTests {

    @Autowired

    protected ApplicationContext ctx;

    /**

     * 队列名queue1  这里使用jms配置文件中的数据

     */

    @Autowired

    private Destination queueDestination;

    /**

     * 队列消息生产者

     */

    @Autowired

    private ProduceMsg produceMessage;

    //向默认队列发消息(文本)

    @Test

    public void produceMsg_DefaultQueue() {

        String msg = "这里是向默认队列发送的消息" + new Date().toString();

        produceMessage.sendMessage(msg);

    }

    //向默认队列发消息(Json字符串)

    @Test

    public void produceMsg_Json() {

        StudentInfo info = new StudentInfo();

        info.setId(1);

        info.setStdName("李磊");

        info.setStdNo("001");

        info.setEnterDate(new Date());  //队列存放的是时间戳

        String alibabaJson = JSON.toJSONString(info);

        produceMessage.sendMessage(alibabaJson);

    }

    //向默认队列发消息(使用convertAndSend发送对象)

    @Test

    public void produceMsg_ConvertAndSend() {

        StudentInfo info = new StudentInfo();

        info.setId(1);

        info.setStdName("李磊");

        info.setStdNo("001");

        info.setEnterDate(new Date());

        produceMessage.sendMessageConvertAndSend(info);

    }

    //向指定队列发消息(文本)

    @Test

    public void produceMsg_CustomQueue() {

        for (int i = 0; i < 20; i++) {

            ActiveMQQueue myDestination = new ActiveMQQueue("queueCustom");

            produceMessage.sendMessage(myDestination, "----发送消息给queueCustom");

        }

    }

    //向指定队列发消息(队列名称从XML读取)

    @Test

    public void produceMsg_XmlQueue() {

        for (int i = 0; i < 20; i++) {

            ActiveMQQueue destinationQueue = (ActiveMQQueue) applicationContext.getBean("queueDestination");

            produceMessage.sendMessage(destinationQueue, "----send my msg to queueXml");

        }

    }

    //向指定队列发消息(发送对象)

    @Test

    public void produceMsg_StudentInfo() {

        StudentInfo info = new StudentInfo();

        info.setId(1);

        info.setStdName("李磊");

        info.setStdNo("001");

        info.setEnterDate(new Date());

        ActiveMQQueue destination = new ActiveMQQueue("StudentInfo");

        produceMessage.sendStudentInfo(destination, info);

    }

}

四、队列消费端及测试程序

1、消费代码


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

package org.soa.test.activemq.queues;

import org.soa.test.activemq.StudentInfo;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.support.JmsUtils;

import org.springframework.stereotype.Component;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.ObjectMessage;

import javax.jms.TextMessage;

/**

 * Created by JamesC on 16-9-22.

 */

@Component

public class ConsumeMsg {

    @Autowired

    private JmsTemplate jmsTemplate;

    /**

     * 接受消息

     */

    public String receive(Destination destination) {

        TextMessage tm = (TextMessage) jmsTemplate.receive(destination);

        String msg = "";

        try {

            msg = tm.getText();

            System.out.println("从队列" + destination.toString() + "收到了消息:\t" + msg);

        } catch (JMSException e) {

            e.printStackTrace();

            return "";

        }

        return msg;

    }

    /**

     * 接受消息

     */

    public StudentInfo receiveStudentInfo() {

        try {

            String destination = jmsTemplate.getDefaultDestination().toString();

            ObjectMessage msg=(ObjectMessage)jmsTemplate.receive(destination);

            return (StudentInfo)msg.getObject();

        } catch (JMSException e) {

            //检查性异常转换为非检查性异常

            throw JmsUtils.convertJmsAccessException(e);

        }

    }

    /**

     * 接受消息

     */

    public Object receiveConvertAndReceive() {

        String destination = jmsTemplate.getDefaultDestination().toString();

        Object msg = jmsTemplate.receiveAndConvert(destination);

        return msg;

    }

}

2、测试程序


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

package org.soa.test.activemq.queues;

import org.apache.activemq.command.ActiveMQQueue;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.soa.test.activemq.StudentInfo;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.test.context.ContextConfiguration;

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**

 * Created by JamesC on 16-9-22.

 */

@RunWith(SpringJUnit4ClassRunner.class)

@ContextConfiguration("/spring-jms.xml")

public class ConsumeMsgTest {

    @Autowired

    private ConsumeMsg consumeMsg;

    //从指定队列接收消息(文本)

    @Test

    public void receiveMsg() {

        //没有消息阻塞一段时间后会抛异常

        //java.lang.NullPointerException

        ActiveMQQueue destination = new ActiveMQQueue("defaultQueueName");

        consumeMsg.receive(destination);

    }

    //从指定队列接收消息(StudentInfo对象消息)

    @Test

    public void receiveStudentInfo() {

        StudentInfo msg = consumeMsg.receiveStudentInfo();

        System.out.println(msg.getStdName());

    }

    //从指定队列接收消息(Json对象)

    @Test

    public void receiveConvertAndReceive() {

        StudentInfo msg =(StudentInfo) consumeMsg.receiveConvertAndReceive();

        System.out.println(msg.getStdName());

    }

}

来自为知笔记(Wiz)

时间: 2024-10-10 23:00:15

ActiveMQ_点对点队列(二)的相关文章

数据结构实验之栈与队列二:一般算术表达式转换成后缀式

数据结构实验之栈与队列二:一般算术表达式转换成后缀式 Description 对于一个基于二元运算符的算术表达式,转换为对应的后缀式,并输出之. Input 输入一个算术表达式,以‘#’字符作为结束标志. Output 输出该表达式转换所得到的后缀式. Sample Input a*b+(c-d/e)*f# Output ab*cde/-f*+ #include <stdio.h> #include <stdlib.h> char s[100005]; //分配栈的大小 int m

ActiveMQ笔记之点对点队列(Point-to-Point)

1. 点对点通信 点对点是一种一对一通信方式,更像是有一个队列,一个人往队列里放消息,另一个人从队列中取消息,其最大的特点是一个消息只会被消费一次,即使有多个消费者同时消费,他们消费的也是不同的消息. 2. 简单实现 添加依赖 添加Maven依赖: <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <dependency> <groupId>org.apache.ac

IPC----消息队列二.函数接口

消息队列可以认为是一个消息链表,System V 消息队列使用消息队列标识符标识.具有足够特权的任何进程都可以往一个队列放置一个消息,具有足够特权的任何进程都可以从一个给定队列读出一个消息.在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达.System V 消息队列是随内核持续的,只有在内核重起或者显示删除一个消息队列时,该消息队列才会真正被删除.可以将内核中的某个特定的消息队列画为一个消息链表,如下图所示: 对于系统中每个消息队列,内核维护一个msqid_ds的信

一起talk C栗子吧(第二十三回:C语言实例--队列二)

各位看官们,大家好,上一回中咱们说的是队列及其特点,并且通过例子来说明队列,这一回咱们继续说 队列,不过咱们说的是链式存储形式的队列,这与上一回中顺序存储形式的队列不一样.闲话休提,言归 正转.让我们一起talk C栗子吧! 在代码中通过链表来实现队列的链式存储.而且定义了一个头结点,头结点主要用来保存队列的头部和尾 部信息,以及队列的长度信息.我们对队列的操作,本质上是对链表进行操作,队列中的结点可以看作是 链表中的结点,对队列进行入列(EnQueue)和出列(DeQueue)的操作,可以看作

洛谷OJ 2216 理想的正方形 单调队列(二维)

https://www.luogu.org/problem/show?pid=2216 题意:给出a*b矩形 从中找到一个n*n正方形,其(最大值-最小值之差)最小,a,b<=1e3,n<=100暴力枚举正方形右下角,如何快速算出其最大值和最小值?先用单调队列预处理出ma[i][j] 表示(i,j)以第i行j列结尾长度为n的最大值/在枚举列之后,对同一个列,由于已经知道该列 每行长度为n的最值 则在次利用单调队列,从上往下扫描行,求出(i,j)为右下角的矩形的最值即可 #include <

学习算法 - 优先级队列二叉堆实现

PriorityQuenue 优先队列就是作业调度类的ADT,这里用二叉堆来实现. 优先队列最少有两个操作:插入(Insert)和删除最小者(DeleteMin). 插入操作图解: 图片来源:www.educity.cn 删除操作图解: watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvYWxwczE5OTI=/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEas

消息队列二三事

最近在看kafka的代码,就免不了想看看消息队列的一些要点:服务质量(QOS).性能.扩展性等等,下面一一探索这些概念,并谈谈在特定的消息队列如kafka或者mosquito中是如何具体实现这些概念的. 服务质量 服务语义 服务质量一般可以分为三个级别,下面说明它们不同语义. At most once 至多一次,消息可能丢失,但绝不会重复传输. 生产者:完全依赖底层TCP/IP的传输可靠性,不做特殊处理,所谓"发送即忘".kafka中设置acks=0. 消费者:先保存消费进度,再处理消

数据结构和算法之栈和队列二:栈的压入,弹出序列

当时我在学习这个的时候也是非常不理解这个问题,一个栈的压入和弹出序列的判断一看不就知道了么,还去判断干嘛.只要符合后进先出的规则就行.但是我在这里简单说一下这个压入和弹出序列是怎么回事.就是我们给定假设的两个序列,一个为压入序列,一个为弹出序列.然后我们再通过一个辅助的栈,把压入序列的数据一个一个push()进入临时的辅助栈中,如果栈顶元素刚好和弹出序列的数据一样,那么我们就弹出,如果不一样我们就将压入序列的数据继续压入临时栈中,直到到达序列结束.如果压入序列结束,临时栈全部数据弹出那么就是一个

转载:数据结构 二项队列

0)引论 左堆的合并,插入,删除最小的时间复杂度为O(logN).二项队列就是为了对这些结果进一步提高的一种数据结构.利用二项队列,这三种操作的最坏时间复杂度为O(logN),但是插入的平均时间复杂度为O(1). 1)二项队列 二项队列不是一棵树,它是一个森林,由一组堆序的树组成的深林,叫做二项队列. 二项队列有几个性质比较重要 (a) 每一颗树都是一个有约束的堆序树,叫做二项树 (b) 高度为k的第k个二项树Bk由一个根节点和B0, B1, .......B(k-1)构成 (c) 高度为k的二