分布式-信息方式-ActiveMQ静态网络连接信息回流功能

“丢失”的消息
有这样的场景, broker1和 broker2通过 netwoskconnector连接,一些 consumers连接到 broker1,
消费 broker2上的消息。消息先被 broker1从 broker2上消费掉,然后转发给这些 consumers。不幸的是转
发部分消息的时候 broker1重启了,这些 consumer发现 broker1连接失败,通过 failover连接到 broker2
上去了,但是有一部分他们还没有消费的消息被 broker2已经分发到了 broker1上去了。这些消息,就好
像是消失了,除非有消费者重新连接到 broker1上来消费。怎么办呢?
从5.6版起,在 destinationPolicy上新增的选项 replayWhenNoConsumers。这个选项使得 broker1
上有需要转发的消息但是没有消费者时,把消息回流到它原始的 broker,同时把 enableAudit设置为
false,为了防止消息回流后祓当做重复消息而不被分发,示例如下:

<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

代码如下:

package test.mq.staitsnetwork;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
       public static void main(String[] args) throws JMSException, InterruptedException {
        ConnectionFactory   ConnectionFactory=new ActiveMQConnectionFactory(
                "tcp://192.168.145.100:61616"
                );
        Connection connection=ConnectionFactory.createConnection();
        connection.start();

        Session session=connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
        Destination destination=session.createQueue("my_queue");
        MessageProducer Producer=session.createProducer(destination);

        for(int i=0;i<30;i++){
             TextMessage message=session.createTextMessage("message----"+i);
                //Thread.sleep(1000);
                Producer.send(message);
        }
         session.commit();
         session.close();
         connection.close();
    }
}

消费者1

package test.mq.staitsnetwork;

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver1{

    public static void main(String[] args) throws JMSException {
        ConnectionFactory   connectionFactory=new ActiveMQConnectionFactory(
                "tcp://192.168.145.100:61676"
                );
        for(int i=0;i<30;i++){
            Thread    t=new MyThread(connectionFactory);
            t.start();
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }
}
class MyThread extends Thread{
         private ConnectionFactory   connectionFactory=null;
         public  MyThread(ConnectionFactory   connectionFactory){
         this.connectionFactory = connectionFactory;
         }
       public void run(){
            try {
                final Connection  connection = connectionFactory.createConnection();
                connection.start();
                Enumeration names=connection.getMetaData().getJMSXPropertyNames();

                final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Destination destination=session.createQueue("my_queue");
                MessageConsumer Consumer=session.createConsumer(destination);
                Consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message msg) {
                    TextMessage     txtmsg=(TextMessage) msg;
                    try {
                        System.out.println("接收信息1--->"+txtmsg.getText());
                    } catch (JMSException e1) {
                        e1.printStackTrace();
                    }
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    }
                });

            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

       }
    }

消费者2

package test.mq.staitsnetwork;

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver2{

    public static void main(String[] args) throws JMSException {
        ConnectionFactory   connectionFactory=new ActiveMQConnectionFactory(
                "tcp://192.168.145.100:61616"
                );
        for(int i=0;i<30;i++){
            Thread    t=new MyThread2(connectionFactory);
            t.start();
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }
}
class MyThread2 extends Thread{
         private ConnectionFactory   connectionFactory=null;
         public  MyThread2(ConnectionFactory   connectionFactory){
         this.connectionFactory = connectionFactory;
         }
       public void run(){
            try {
                final Connection  connection = connectionFactory.createConnection();
                connection.start();
                Enumeration names=connection.getMetaData().getJMSXPropertyNames();

                final Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                Destination destination=session.createQueue("my_queue");
                MessageConsumer Consumer=session.createConsumer(destination);
                Consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message msg) {
                    TextMessage     txtmsg=(TextMessage) msg;
                    try {
                        System.out.println("接收信息2--->"+txtmsg.getText());
                    } catch (JMSException e1) {
                        e1.printStackTrace();
                    }
                    try {
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                    try {
                        session.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    }
                });

            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

       }
    }

运行结果:

消费者1

消费者2

原文地址:https://www.cnblogs.com/caoyingjielxq/p/9359744.html

时间: 2024-08-29 20:29:16

分布式-信息方式-ActiveMQ静态网络连接信息回流功能的相关文章

分布式-信息方式-ActiveMQ结合Spring

ActiveMQ结合 Spring开发■ Spring提供了对JMS的支持,需要添加 Spring支持jms的包,如下: <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency> <dependency> <groupId>org.springframework

在数据库连接配置文件之前对连接信息进行解密并连接

技术交流群:233513714 1.如果数据库连接密码没有加密的时候连接的配置方式是 <bean id="anteaterDs" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="${jdbc.driverClassName}"/> <property name=&quo

android 获取网络连接信息

效果图: 工具类 /** * 获取网络连接信息 * * 根据NetworkInfo可以知道有很多的连接方式和信息 * * ① 当没有任何可用网络的时候,networkinfo为null 判断networkinfo是否为null * * ② 当只有wifi网络或者wifi网络和移动网络同时存在的时候,返回wifi网络连接信息 * * NetworkInfo参数如下: * * detailedState:CONNECTED(连接状态) * * extraInfo:yiteng1(wifi网络名称)

【我的Linux,我做主!】实战--使用netstat监控网络连接信息

目录:(一)netstat简介(二)netstat语法指南(三)实战演练(四)netstat小结 (一)netstat简介(1.1)在Internet的RFC标准中,netstat的定义是:netstat是在内核中访问网络连接状态及相关信息的程序,它能提供TCP连接.在TCP和UDP监听.进程内存管理的相关报告.netstat是控制台命令,是一个监控TCP/IP网络的非常有用的工具,它可以显示路由表.实际的网络连接以及每一个网络接口设备的状态信息.netstat用于显示IP.TCP.UDP和IC

hadoop源码解读namenode高可靠:HA;web方式查看namenode下信息;dfs/data决定datanode存储位置

点击browserFilesystem,和命令查看结果一样 当我们查看hadoop源码时,我们看到hdfs下的hdfs-default.xml文件信息 我们查找${hadoop.tmp.dir}这是引用变量,肯定在其他文件有定义,在core-default.xml中查看到,这两个配置文件有个共同点: 就是不要修改此文件,但可以复制信息到core-site.xml和hdfs-site.xml中修改 usr/local/hadoop 是我存放hadoop文件夹的地方 几个关于namenode的重要文

使用Dom4j解析包含有DB连接信息的XML文件以及节点属性的获取

包含DB连接信息的XML文件 1 <!--示例1——三级显示--> 2 <db-connections> 3 <connection> 4 <name>DBTest</name> 5 <jndi></jndi> 6 <url> 7 <![CDATA[jdbc:mysql://localhost:3306/db_test?useUnicode=true&characterEncoding=UTF8]

SpringMVC使用隐式jdbc连接信息

本地测试环境中,我们如果使用SpringMVC构建项目时,数据库的"用户名.密码.连接地址"都直接明文写在配置文件中.而一旦项目构建在运行环境的服务器时,对"用户名.密码.连接地址"连接信息进行必要的隐藏,这样更加安全一些,那么如何来做隐式jdbc连接信息呢? 一.隐式jdbc连接信息的可行方案 SpringMVC在构建jdbc连接信息时,一般是在"applicationContext.xml"文件中有如下信息提供给项目的JdbcConnecti

查看Oracle数据库某用户的连接信息

执行以下语句可查出用户TJAMIS_LXF连接信息: select schemaname, osuser, process, machine, port, terminal, program from v$session where username = 'TJAMIS_LXF' order by osuser;

[Oracle]如何获得出现故障时,客户端的详细连接信息

客户坚持说 只是在 每天早上5点才运行下面的语句: select / * + FULL (TAB001_TT01) * / 'TAB001_TT01', count (*) from u01.TAB001_TT01 group by 'TAB001_TT01' 但是根据 Incident 文件的记载,发生时间是在 2017-09-26 10: 44: 50.166 , 客户怀疑 Oracle的数据库出现了其他的问题. 这样调查就跑偏方向了. (因为总所周知的原因,修改了敏感信息) 从下面这句“M