JMS-activeMq发布订阅模式(非持久订阅)

Publisher的代码:

  1. import javax.jms.Connection;
  2. import javax.jms.ConnectionFactory;
  3. import javax.jms.DeliveryMode;
  4. import javax.jms.Destination;
  5. import javax.jms.JMSException;
  6. import javax.jms.MapMessage;
  7. import javax.jms.MessageProducer;
  8. import javax.jms.Session;
  9. import javax.jms.TextMessage;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. public class Publisher {
  12. // 单例模式
  13. // 1、连接工厂
  14. private ConnectionFactory connectionFactory;
  15. // 2、连接对象
  16. private Connection connection;
  17. // 3、Session对象
  18. private Session session;
  19. // 4、生产者
  20. private MessageProducer messageProducer;
  21. public Publisher() {
  22. try {
  23. this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",
  24. "123", "tcp://localhost:61616");
  25. this.connection = connectionFactory.createConnection();
  26. this.connection.start();
  27. // 不使用事务
  28. // 设置客户端签收模式
  29. this.session = this.connection.createSession(false,
  30. Session.AUTO_ACKNOWLEDGE);
  31. this.messageProducer = this.session.createProducer(null);
  32. } catch (JMSException e) {
  33. throw new RuntimeException(e);
  34. }
  35. }
  36. public Session getSession() {
  37. return this.session;
  38. }
  39. public void send1(/* String QueueName, Message message */) {
  40. try {
  41. Destination destination = this.session.createTopic("topic1");
  42. MapMessage msg1 = this.session.createMapMessage();
  43. msg1.setString("name", "张三");
  44. msg1.setInt("age", 22);
  45. MapMessage msg2 = this.session.createMapMessage();
  46. msg2.setString("name", "李四");
  47. msg2.setInt("age", 25);
  48. MapMessage msg3 = this.session.createMapMessage();
  49. msg3.setString("name", "张三");
  50. msg3.setInt("age", 30);
  51. // 发送消息到topic1
  52. this.messageProducer.send(destination, msg1,
  53. DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
  54. this.messageProducer.send(destination, msg2,
  55. DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
  56. this.messageProducer.send(destination, msg3,
  57. DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
  58. } catch (JMSException e) {
  59. throw new RuntimeException(e);
  60. }
  61. }
  62. public void send2() {
  63. try {
  64. Destination destination = this.session.createTopic("topic1");
  65. TextMessage message = this.session.createTextMessage("我是一个字符串");
  66. // 发送消息
  67. this.messageProducer.send(destination, message,
  68. DeliveryMode.NON_PERSISTENT, 4, 1000 * 60 * 10);
  69. } catch (JMSException e) {
  70. throw new RuntimeException(e);
  71. }
  72. }
  73. public static void main(String[] args) {
  74. Publisher producer = new Publisher();
  75. producer.send1();
  76. }
  77. }

Subscribe的代码:

  1. import javax.jms.Connection;
  2. import javax.jms.ConnectionFactory;
  3. import javax.jms.Destination;
  4. import javax.jms.JMSException;
  5. import javax.jms.MapMessage;
  6. import javax.jms.Message;
  7. import javax.jms.MessageConsumer;
  8. import javax.jms.MessageListener;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. public class Subscriber {
  13. // 单例模式
  14. // 1、连接工厂
  15. private ConnectionFactory connectionFactory;
  16. // 2、连接对象
  17. private Connection connection;
  18. // 3、Session对象
  19. private Session session;
  20. // 4、生产者
  21. private MessageConsumer messageConsumer;
  22. // 5、目的地址
  23. private Destination destination;
  24. public Subscriber() {
  25. try {
  26. this.connectionFactory = new ActiveMQConnectionFactory("zhangsan",
  27. "123", "tcp://localhost:61616");
  28. this.connection = connectionFactory.createConnection();
  29. this.connection.start();
  30. // 不使用事务
  31. // 设置客户端签收模式
  32. this.session = this.connection.createSession(false,
  33. Session.AUTO_ACKNOWLEDGE);
  34. this.destination = this.session.createTopic("topic1");
  35. this.messageConsumer = this.session.createConsumer(destination);
  36. } catch (JMSException e) {
  37. throw new RuntimeException(e);
  38. }
  39. }
  40. public Session getSession() {
  41. return this.session;
  42. }
  43. // 用于监听消息队列的消息
  44. class MyLister implements MessageListener {
  45. @Override
  46. public void onMessage(Message message) {
  47. try {
  48. if (message instanceof TextMessage) {
  49. }
  50. if (message instanceof MapMessage) {
  51. MapMessage ret = (MapMessage) message;
  52. System.out.println(ret.toString());
  53. System.out.println(ret.getString("name"));
  54. System.out.println(ret.getInt("age"));
  55. // 因为设置的是客户端的签收模式,所以要手动的去确认消息的消费
  56. message.acknowledge();
  57. }
  58. } catch (JMSException e) {
  59. throw new RuntimeException(e);
  60. }
  61. }
  62. }
  63. // 用于异步监听消息
  64. public void receiver() {
  65. try {
  66. this.messageConsumer.setMessageListener(new MyLister());
  67. } catch (JMSException e) {
  68. throw new RuntimeException(e);
  69. }
  70. }
  71. public static void main(String[] args) {
  72. Subscriber conmuser = new Subscriber();
  73. conmuser.receiver();
  74. }
  75. }

先启动消费者(先订阅后消费),再启动发布者

时间: 2024-11-02 04:52:10

JMS-activeMq发布订阅模式(非持久订阅)的相关文章

理解JMS规范中的持久订阅和非持久订阅

jms1.1规范针对publisher/subscriber模型提出了持久订阅和非持久订阅者.我们用现实生活中的例子来说明,持久订阅和非持久订阅的区别. 1.非持久订阅 考虑学生听老师讲课的情景,大学老师讲课,一部分学生会去教室听课,另一部分学生会选择逃课在寝室睡觉.开始上课前,老师可能会点名,在教室听课的同学知道这个消息,逃课的同学就不知道这个消息(除非朋友电话通知的情况).即老师发布消息的时候,如果学生在教室就能知道,不在教室就不知道.非持久订阅只有当客户端处于激活状态,也就是和JMS Pr

JMS学习(五)--ActiveMQ中的消息的持久化和非持久化 以及 持久订阅者 和 非持久订阅者之间的区别与联系

一,消息的持久化和非持久化 ①DeliveryMode 这是传输模式.ActiveMQ支持两种传输模式:持久传输和非持久传输(persistent and non-persistent delivery),默认情况下使用的是持久传输. 可以通过MessageProducer 类的 setDeliveryMode方法设置传输模式: MessageProducer producer = ...; producer.setDeliveryMode(DeliveryMode.PERSISTENT); 持

JMS ActiveMQ研究文档

1. 背景 当前,CORBA.DCOM.RMI等RPC中间件技术已广泛应用于各个领域.但是面对规模和复杂度都越来越高的分布式系统,这些技术也显示出其局限性:(1)同步通信:客户发出调用后,必须等待服务对象完成处理并返回结果后才能继续执行:(2)客户和服务对象的生命周期紧密耦合:客户进程和服务对象进程 都必须正常运行:如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户会接收到异常:(3)点对点通信:客户的一次调用只发送给某个单独的目标对象. 面向消息的中间件(Message Oriente

ActiveMQ两种模式PTP和PUB/SUB<转>

1.PTP模型 PTP(Point-to-Point)模型是基于队列(Queue)的,对于PTP消息模型而言,它的消息目的是一个消息队列(Queue),消息生产者每次发送消息总是把消息送入消息队列中,消息消费者总是从消息队列中读取消息.先进队列的消息将先被消息消费者读取. 发送方发消息到队列,接收方从队列接收消息,队列的存在使得消息的异步传输成为可能.和邮件系统中的邮箱一样,队列可以包含各种消息,JMS Provider 提供工具管理队列的创建.删除.JMS PTP 模型定义了客户端如何向队列发

关于 ActiveMQ 的消息模式

1.JMS Queue 执行 load balancer语义:一条消息仅能被一个 consumer(消费者) 收到.如果在 message 发送的时候没有可用的consumer,那么它将被保存一直到能处理该 message 的 consumer 可用.如果一个 consumer 收到一条 message 后却不响应它,那么这条消息将被转到另一个consumer 那儿.一个 Queue 可以有很多 consumer,并且在多个可用的 consumer中负载均衡. 点对点消息传递域的特点如下:• 

JMS消息队列ActiveMQ(发布/订阅模式)

消费者1(Consumer)--订阅(subcribe)-->主题(Topic) package com.java1234.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.

ActiveMQ入门系列三:发布/订阅模式

在上一篇<ActiveMQ入门系列二:入门代码实例(点对点模式)>中提到了ActiveMQ中的两种模式:点对点模式(PTP)和发布/订阅模式(Pub & Sub),详细介绍了点对点模式并用代码实例进行说明,今天就介绍下发布/订阅模式. 一.理论基础 发布/订阅模式的工作示意图: 消息生产者将消息(发布)到topic中,可以同时有多个消息消费者(订阅)消费该消息. 和点对点方式不同,发布到topic的消息会被所有订阅者消费. 当生产者发布消息,不管是否有消费者,都不会保存消息. 一定要先

ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)

ActiveMQ简单简绍 MQ简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBMWEBSPHERE MQ. MQ特点: M

ActiveMQ发布订阅模式

ActiveMQ的另一种模式就SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思.意思就是一个来源分到N个出口.还是上节的例子,当一个订单产生后,后台N个系统需要联动,但有一个前提是都需要收到订单信息,那么我们就需要将一个生产者的消息发布到N个消费者. 生产者: try { //Create the Connection Factory IConnectionFactory factory = new ConnectionFactory("tcp://localhost