ActiveMQ 配置jdbc主从

使用 jdbc 方式配置主从模式,持久化消息存放在数据库中。

在同一时刻,只有一个 master broker,master 接受客户端的连接,slave 不接受连接。
当 master 因为关机而下线后,其中一个 slave 会提升为 master,然后接受客户端连接。但原来 master 的非持久消息丢失了,而持久消息保存在数据库中。

broker xml 配置:使用 MySQL 数据源

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!--
    Use JDBC for message persistence
    For more information, see:

    http://activemq.apache.org/persistence.html

    You need to add Derby database to your classpath in order to make this example work.
    Download it from http://db.apache.org/derby/ and put it in the ${ACTIVEMQ_HOME}/lib/optional/ folder
    Optionally you can configure any other RDBM as shown below

    To run ActiveMQ with this configuration add xbean:conf/activemq-jdbc.xml to your command

    e.g. $ bin/activemq xbean:conf/activemq-jdbc.xml
 -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

  <broker useJmx="false" brokerName="jdbcBroker" xmlns="http://activemq.apache.org/schema/core">
    <persistenceAdapter>
       <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
    </persistenceAdapter>

    <transportConnectors>
       <transportConnector name="default" uri="tcp://0.0.0.0:61616"/>
    </transportConnectors>
  </broker>

  <!-- Embedded Derby DataSource Sample Setup -->
 <!--  <bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
    <property name="databaseName" value="derbydb"/>
    <property name="createDatabase" value="create"/>
  </bean> -->

  <!-- Postgres DataSource Sample Setup -->
  <!--
  <bean id="postgres-ds" class="org.postgresql.ds.PGPoolingDataSource">
    <property name="serverName" value="localhost"/>
    <property name="databaseName" value="activemq"/>
    <property name="portNumber" value="0"/>
    <property name="user" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="dataSourceName" value="postgres"/>
    <property name="initialConnections" value="1"/>
    <property name="maxConnections" value="10"/>
  </bean>
  -->

  <!-- MySql DataSource Sample Setup -->

  <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://192.168.40.8:3306/db_zhang?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>

  <!-- Oracle DataSource Sample Setup -->
  <!--
  <bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>
    <property name="url" value="jdbc:oracle:thin:@localhost:1521:AMQDB"/>
    <property name="username" value="scott"/>
    <property name="password" value="tiger"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
  </bean>
  -->

</beans>

客户端配置,producer 和 consumer 是一样的:

new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61618)");

以 MySQL 数据库为例,启动 broker 后,MySQL 中会创建 3 张表:
ACTIVEMQ_MSGS 存放持久消息
ACTIVEMQ_LOCK 表中只有一条记录,broker 执行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE 获取锁,获得锁的 broker 是 master
ACTIVEMQ_ACKS

broker 获取锁的调用栈如下:

// org.apache.activemq.store.jdbc.DefaultDatabaseLocker
public void doStart() throws Exception {

    LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
    // SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE
    String sql = getStatements().getLockCreateStatement();
    LOG.debug("Locking Query is "+sql);

    while (true) {
        try {
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            lockCreateStatement = connection.prepareStatement(sql);
            // 执行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE
            // 如果成功,则跳出循环。如果超时,则抛异常
            lockCreateStatement.execute();
            break;
        } catch (Exception e) {
            try {
                if (isStopping()) {
                    throw new Exception(
                            "Cannot start broker as being asked to shut down. "
                                    + "Interrupted attempt to acquire lock: "
                                    + e, e);
                }
                if (exceptionHandler != null) {
                    try {
                        exceptionHandler.handle(e);
                    } catch (Throwable handlerException) {
                        LOG.error( "The exception handler "
                                + exceptionHandler.getClass().getCanonicalName()
                                + " threw this exception: "
                                + handlerException
                                + " while trying to handle this exception: "
                                + e, handlerException);
                    }

                } else {
                    LOG.debug("Lock failure: "+ e, e);
                }
            } finally {
                // Let‘s make sure the database connection is properly
                // closed when an error occurs so that we‘re not leaking
                // connections
                if (null != connection) {
                    try {
                        connection.rollback();
                    } catch (SQLException e1) {
                        LOG.debug("Caught exception during rollback on connection: " + e1, e1);
                    }
                    try {
                        connection.close();
                    } catch (SQLException e1) {
                        LOG.debug("Caught exception while closing connection: " + e1, e1);
                    }

                    connection = null;
                }
            }
        } finally {
            if (null != lockCreateStatement) {
                try {
                    lockCreateStatement.close();
                } catch (SQLException e1) {
                    LOG.debug("Caught while closing statement: " + e1, e1);
                }
                lockCreateStatement = null;
            }
        }

        LOG.info("Failed to acquire lock.  Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
        try {
            Thread.sleep(lockAcquireSleepInterval);
        } catch (InterruptedException ie) {
            LOG.warn("Master lock retry sleep interrupted", ie);
        }
    }

    LOG.info("Becoming the master on dataSource: " + dataSource);
}

broker 执行 SELECT * FROM ACTIVEMQ_LOCK FOR UPDATE 获取锁,获取成功,则成为 master,如果失败,则睡眠一段时间后,继续获取锁。

超时而抛出的异常:

原文地址:https://www.cnblogs.com/allenwas3/p/8955458.html

时间: 2024-08-29 02:57:20

ActiveMQ 配置jdbc主从的相关文章

配置 JDBC SqlSever2008 R2

最近琢磨怎么把java和数据库结合起来写程序,发现了JDBC这种神奇的东西.网上搜了一些文章,首先是要配置JDBC才能开始使用一些sql的接口,再看了N篇文章并且动手尝试后,不断出错并且debug,终于配置好啦~ 下面就让我分享一下我的配置过程. 一.介绍 什么是JDBC: (来自百度)JDBC(Java Data Base Connectivity,java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成.JDB

weblogic配置jdbc数据源

weblogic配置jdbc数据源的过程 方法/步骤 启动weblogic 管理服务器,使用管理用户登录weblogic管理控制台   打开管理控制台后,在左侧的树形域结构中,选择服务->数据源. 在右侧的窗口中,选择 新建->一般数据源   填写数据源名称,JNDI名,选择数据库类型(本文以Oracle数据库为例) 点击下一步按钮   选择数据库驱动程序,点击下一步按钮   事务处理选项画面,点击 下一步 按钮   新建JDBC数据源画面 数据库名称 主机名 端口号 数据库用户名 密码 用于

centos 安装glassfish4.0 配置jdbc连接mysql

版本glassfish-4.0.zip 1.解压,拷贝到指定安装路径 unzip glassfish-4.0.zip  cp cp glassfish4 /usr/local/ -rf 2.设置glassfish环境变量 vim  /etc/profile export GLASSFISH_HOME=/usr/local/glassfish4 export PATH=.:$PATH:$JAVAHOME/bin:$MYSQLHOME/bin:$GLASSFISH_HOME/bin source /

CentOS安装GlassFish4.0 配置JDBC连接MySQL

转自:http://linux.it.net.cn/CentOS/course/2014/0724/3319.html 版本glassfish-4.0.zip 1.解压,拷贝到指定安装路径   unzip glassfish-4.0.zip cp cp glassfish4 /usr/local/ -rf 2.设置glassfish环境变量 Linux学习,http:// linux.it.net.cn   vim  /etc/profile export GLASSFISH_HOME=/usr

spring中配置jdbc数据源

1.加入jdbc驱动器包,mysql-connector-java.jar 2.加入commons-dbcp.jar配置数据源 3.在classpath下新建文件jdbc.properties,配置jdbc数据源参数 jdbc.driverClassName=com.mysql.jdbc.Driver jdbc.url=jdbc\:mysql\://localhost\:3306/baiyin jdbc.username=root jdbc.password=123456 4.在xml中导入数据

Ubuntu配置Mysql主从数据库

MySQL数据库支持数据库的主从复制功能,因此在集群方面具有其独特的优势.众多国内外大型网站架构体系中,均采用了MySQL的主从数据库配置来实现查询负载.数据库热备等功能.本人在实际的Web项目中也涉及到这一需求,在此将如何配置实现做个简单小结. 本次环境:虚拟机下 服务器:Ubuntu 14.04 LTS数据库: 5.5.37端口:3306主IP:192.168.63.133从IP:192.168.63.134授权账号:user:suxhpassword:111111好了交代完环境:我们直接配

配置jdbc.properties 以及 ConfigManager应用

配置jdbc.propertiesjava.driveClass=com.mysql.jdbc.Driverjdbc.connection.url=jdbc:mysql://127.0.0.1:3306/MySQL(数据库名)jdbc.connection.username=rootjdbc.connection.password=Admin 读取配置文件:使用Properties对象的load()方法来实现配置文件的读取,使用流来实现文件读写的操作. 创建一个工具类: ConfigManage

spring:配置jdbc

前言:我不得不说,在配置spring的jdbc链接上,我犯了很多错,虽然我一再改善,并且寻求更加简洁的解决方案,然而都一直未能最终解决,我必须感谢我的队友,是他轻松的找到了这个解决方案,而我一直苦苦寻找的就是它.spring的占位符,通过简单的context:property-placeholder元素,就可以轻松的配置jdbc链接,但是这是一个曲折的故事,你听我来诉苦,并且得到你想要的答案. 最初的样子 <bean id="dataSource" class="org

tomcat_配置jdbc(小例)

原创作品,出自 "深蓝的blog" 博客,深蓝的blog:http://blog.csdn.net/huangyanlong/article/details/47043975 tomcat下配置jdbc举例 使用tomcat手工配置jdbc时,一般会到应用下面的webapps中的"应用名称\WEB-INF\classes"文件夹中找到jdbc.properties文件. 然后根据样例配置jdbc即可,下面举一个例子:某java程序使用三用户方式访问各用户数据的系统,