“丢失”的消息
有这样的场景, broker1和 broker2通过 netwoskconnector连接,一些 consumers连接到 broker1,
消费 broker2上的消息。消息先被 broker1从 broker2上消费掉,然后转发给这些 consumers。不幸的是转
发部分消息的时候 broker1重启了,这些 consumer发现 broker1连接失败,通过 failover连接到 broker2
上去了,但是有一部分他们还没有消费的消息被 broker2已经分发到了 broker1上去了。这些消息,就好
像是消失了,除非有消费者重新连接到 broker1上来消费。怎么办呢?
从5.6版起,在 destinationPolicy上新增的选项 replayWhenNoConsumers。这个选项使得 broker1
上有需要转发的消息但是没有消费者时,把消息回流到它原始的 broker,同时把 enableAudit设置为
false,为了防止消息回流后祓当做重复消息而不被分发,示例如下:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
代码如下:
package test.mq.staitsnetwork; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] args) throws JMSException, InterruptedException { ConnectionFactory ConnectionFactory=new ActiveMQConnectionFactory( "tcp://192.168.145.100:61616" ); Connection connection=ConnectionFactory.createConnection(); connection.start(); Session session=connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE); Destination destination=session.createQueue("my_queue"); MessageProducer Producer=session.createProducer(destination); for(int i=0;i<30;i++){ TextMessage message=session.createTextMessage("message----"+i); //Thread.sleep(1000); Producer.send(message); } session.commit(); session.close(); connection.close(); } }
消费者1
package test.mq.staitsnetwork; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver1{ public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory( "tcp://192.168.145.100:61676" ); for(int i=0;i<30;i++){ Thread t=new MyThread(connectionFactory); t.start(); try { Thread.sleep(1000l); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class MyThread extends Thread{ private ConnectionFactory connectionFactory=null; public MyThread(ConnectionFactory connectionFactory){ this.connectionFactory = connectionFactory; } public void run(){ try { final Connection connection = connectionFactory.createConnection(); connection.start(); Enumeration names=connection.getMetaData().getJMSXPropertyNames(); final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination=session.createQueue("my_queue"); MessageConsumer Consumer=session.createConsumer(destination); Consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { TextMessage txtmsg=(TextMessage) msg; try { System.out.println("接收信息1--->"+txtmsg.getText()); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
消费者2
package test.mq.staitsnetwork; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver2{ public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory=new ActiveMQConnectionFactory( "tcp://192.168.145.100:61616" ); for(int i=0;i<30;i++){ Thread t=new MyThread2(connectionFactory); t.start(); try { Thread.sleep(1000l); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class MyThread2 extends Thread{ private ConnectionFactory connectionFactory=null; public MyThread2(ConnectionFactory connectionFactory){ this.connectionFactory = connectionFactory; } public void run(){ try { final Connection connection = connectionFactory.createConnection(); connection.start(); Enumeration names=connection.getMetaData().getJMSXPropertyNames(); final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination=session.createQueue("my_queue"); MessageConsumer Consumer=session.createConsumer(destination); Consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message msg) { TextMessage txtmsg=(TextMessage) msg; try { System.out.println("接收信息2--->"+txtmsg.getText()); } catch (JMSException e1) { e1.printStackTrace(); } try { session.commit(); } catch (JMSException e) { e.printStackTrace(); } try { session.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
运行结果:
消费者1
消费者2
原文地址:https://www.cnblogs.com/caoyingjielxq/p/9359744.html
时间: 2024-11-05 22:54:08