ActiveMQ持久化到mysql实现消息永不丢失

ActiveMQ持久化到mysql实现消息永不丢失

配置

1.找到apache-activemq-5.15.2/examples/conf下面的activemq-jdbc-performance.xml

2.打开activemq-jdbc-performance.xml,在persistenceAdapter节点后面添加dataSource="#mysql-ds"

并配置你的数据库

其实可以直接更改apache-activemq-5.15.2/conf/activemq.xml的persistenceAdapter节点.配置下数据库也是可以的,用

用activemq-jdbc-performance.xml没有localhost:8161的管理页面,并且只能用openwire传输协议,默认是全开的,transportConnectors节点为开启的传输协议

3.把activemq-jdbc-performance.xml复制到apache-activemq-5.15.2/conf目录下,从命名为activemq.xml,覆盖原来的activemq.xml

4.在对应的数据库创建activemq库,然后重启ActiveMQ

我们这里用debug模式启动,提示没有mysql的jar包

5.我们在apache-activemq-5.15.2/lib下面添加mysql的jar包,再次启动,就不会报错了

6.这时可以看到刚才创建的activemq库多了三张表,说明配置成功了

点对点测试

生产者

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer {
         public static void main(String[] args) {
//                   String user = ActiveMQConnection.DEFAULT_USER;
//                   String password = ActiveMQConnection.DEFAULT_PASSWORD;
//                   String url = ActiveMQConnection.DEFAULT_BROKER_URL;
                   String subject = "test.queue";
                   ConnectionFactory contectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616");
                //   ConnectionFactory contectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
                   try{
                            Connection connection = contectionFactory.createConnection();
                            connection.start();
                            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                            Destination destination = session.createQueue(subject);
                            MessageProducer producer = session.createProducer(destination);
                         //   producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置为持久化
                            for(int i = 0; i < 20;) {
                                     TextMessage createTextMessage = session.createTextMessage("这是要发送的第"+ ++i +"条消息消息");
                                     producer.send(createTextMessage);
                                     System.out.println("第"+ i +"条消息已发送");
                            }
                            Thread.sleep(2000);
                            session.commit();
                            session.close();
                            connection.close();
                   }catch (JMSException e) {
                      //      e.printStackTrace();
                   }catch (InterruptedException e) {
                       //     e.printStackTrace();
                   }

         }

}

消费者

import java.util.Date;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Customer {

    public static void main(String[] args) {

//        String user = ActiveMQConnection.DEFAULT_USER;
//
//        String password = ActiveMQConnection.DEFAULT_PASSWORD;
//
//        String url = ActiveMQConnection.DEFAULT_BROKER_URL;

        String subject = "test.queue";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616");
      //  ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");

        Connection connection;

        try {
            connection= connectionFactory.createConnection();

            connection.start();

            final Session session =connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            Destination destination = session.createQueue(subject);

            MessageConsumer message = session.createConsumer(destination);

            message.setMessageListener(new MessageListener() {
                public void onMessage(Message msg){
                    TextMessage message = (TextMessage) msg;
                    try {
                        System.out.println("--收到消息:" +new Date()+message.getText());
                        session.commit();
                    }catch(JMSException e) {
                 //       e.printStackTrace();
                    }

                }

            });
//            Thread.sleep(30000);
//
//            session.close();
//
//            Thread.sleep(30000);
//
//            connection.close();
//
//            Thread.sleep(30000);

        }catch(Exception e) {
        //    e.printStackTrace();
        }

    }

}

这时生产者生产数据,消费者一直不在线,数据就会持久化到数据库的activemq_msgs表,就算ActiveMQ的服务挂了,再次启动后,等消费者在线了就可以再次获取生产者生产的数据(消费之后数据库的数据会自动删除),达到不丢失的效果

原文地址:https://www.cnblogs.com/AngeLeyes/p/8991719.html

时间: 2024-10-27 18:46:14

ActiveMQ持久化到mysql实现消息永不丢失的相关文章

Apache ActiveMQ 持久化到MySQL数据库的简单配置

1.  前言 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 关于ActiveMQ 的详细介绍请参考:http://baike.baidu.com/view/433374.htm?fr=aladdin 关于JMS的详细介绍请参考:http://baike.baidu.com/su

Apache ActiveMQ 持久化到MySQL数据库

1.  前言 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMSProvider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 关于ActiveMQ 的详细介绍请参考:http://baike.baidu.com/view/433374.htm?fr=aladdin 关于JMS的详细介绍请参考:http://baike.baidu.com/sub

activemq持久化之mysql配置

参考:http://blog.csdn.net/zbw18297786698/article/details/52999940 说明:192.168.3.81 CentOS release 6.5 apache-activemq-5.10.1 1.拷贝mysql驱动器到activemq环境下 mysql-connector-java-5.1.44-bin.jar [[email protected] apache-activemq-5.10.1]# ls /home/apache-activem

JMS消息持久化,将ActiveMQ消息持久化到mySql数据库中

ActiveMQ5.8.0版本采用kahadb作为默认的消息持久化方式.使用默认的持久化机制,我们不容易直接看到消息究竟是如何持久的.ActiveMQ提供的JDBC持久化机制,能够将持久化信息存储到数据库.通过查看数据库中ActiveMQ生成的表结构和存储的数据,能够帮助我们更好的了解消息的持久化机制.现在介绍如何配置activemq,将数据持久化到mysql中. 1.配置activeMQ需要的mySql数据源 为了能够使用JDBC访问mysql数据库,显然必须要配置消息服务器的数据库源.在ac

ActiveMQ持久化消息

ActiveMQ的另一个问题就是只要是软件就有可能挂掉,挂掉不可怕,怕的是挂掉之后把信息给丢了,所以本节分析一下几种持久化方式: 一.持久化为文件 ActiveMQ默认就支持这种方式,只要在发消息时设置消息为持久化就可以了. 打开安装目录下的配置文件: D:\ActiveMQ\apache-activemq\conf\activemq.xml在越80行会发现默认的配置项: <persistenceAdapter> <kahaDB directory="${activemq.da

ActiveMQ持久化及测试(转)

转:http://blog.csdn.net/xyw_blog/article/details/9128219 ActiveMQ持久化 消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据

ActiveMQ持久化方式

消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试.消息中心启动以后首先要检查制

ActiveMQ持久化方式(转)

消息持久性对于可靠消息传递来说应该是一种比较好的方法,有了消息持久化,即使发送者和接受者不是同时在线或者消息中心在发送者发送消息后宕机了,在消息 中心重新启动后仍然可以将消息发送出去,如果把这种持久化和ReliableMessaging结合起来应该是很好的保证了消息的可靠传送. 消息持久性的原理很简单,就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送 给接收者,发送成功则将消息从存储中删除,失败则继续尝试.消息中心启动以后首先要检

关于MQ的几件小事(四)如何保证消息不丢失

1.mq原则 数据不能多,也不能少,不能多是说消息不能重复消费,这个我们上一节已解决:不能少,就是说不能丢失数据.如果mq传递的是非常核心的消息,支撑核心的业务,那么这种场景是一定不能丢失数据的. 2.丢失数据场景 丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了.下面从rabbitmq和kafka分别说一下,丢失数据的场景, (1)rabbitmq A:生产者弄丢了数据 生产者将数据发送到rabbitmq的时候,可能在传输过程中因为网络等问题而将数据弄丢了. B:rabbit