ActiveMQ(09):ActiveMQ消息存储持久化

一、简介

1.1 描述

ActiveMQ不仅支持persistent和non-persistent两种方式,还支持消息的恢复(recovery)方式、重新投递等

1.2 PTP与PUB/SUB

1.2.1 PTP

对于持久化订阅主题,每一个消费者将获得一个消息的复制。

1.2.2 PUB/SUB

对于持久化订阅主题,每一个消费者将获得一个消息的复制。

1.3 有效的消息存储

ActiveMQ提供了一个插件式的消息存储,类似于消息的多点传播,主要实现了如下几种:

1:AMQ消息存储-基于文件的存储方式,是以前的默认消息存储

2:KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式

3:JDBC消息存储-消息基于JDBC存储的

4:Memory 消息存储-基于内存的消息存储

二、持久化

2.1 KahaDB消息存储

1、KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。

在:activemq/conf/activemq.xml中可以查看

2、KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。在Kaha中,数据被追加到data logs中。当不再需要log文

件中的数据的时候,log文件会被丢弃。

3、KahaDB基本配置例子:

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

可用的属性有:

1:director:KahaDB存放的路径,默认值activemq-data

2:indexWriteBatchSize: 批量写入磁盘的索引page数量,默认值1000

3:indexCacheSize:内存中缓存索引page的数量,默认值10000

4:enableIndexWriteAsync:是否异步写出索引,默认false

5:journalMaxFileLength:设置每个消息data log的大小,默认是32MB

6:enableJournalDiskSyncs:设置是否保证每个没有事务的内容,被同步写入磁盘,JMS持久化的时候需要,默认为true

7:cleanupInterval:在检查到不再使用的消息后,在具体删除消息前的时间,默认30000

8:checkpointInterval:checkpoint的间隔时间,默认5000

9:ignoreMissingJournalfiles:是否忽略丢失的消息日志文件,默认false

10:checkForCorruptJournalFiles:在启动的时候,将会验证消息文件是否损坏,默认false

11:checksumJournalFiles:是否为每个消息日志文件提供checksum,默认false

12:archiveDataLogs: 是否移动文件到特定的路径,而不是删除它们,默认false

13:directoryArchive:定义消息已经被消费过后,移动data log到的路径,默认null

14:databaseLockedWaitDelay:获得数据库锁的等待时间 (used by shared master/slave),默认10000

15:maxAsyncJobs:设置最大的可以存储的异步消息队列,默认值10000,可以和concurrentMessageProducers 设置成一样的值

16:concurrentStoreAndDispatchTransactions:是否分发消息到客户端,同时事务存储消息,默认true

17:concurrentStoreAndDispatchTopics:是否分发Topic消息到客户端,同时进行存储,默认true

18:concurrentStoreAndDispatchQueues:是否分发queue消息到客户端,同时进行存储,默认true

2.2 AMQ消息存储

1、AMQ Message Store是ActiveMQ5.0缺省的持久化存储,它是一个基于文件、事务存储设计为快速消息存储的一个结构,该结构是以流的形式来

进行消息交互的。

2、这种方式中,Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Date logs由一些单独的data log文件组成,缺省的

文件大小是32M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消

息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。

3、配置示例

<broker brokerName="broker" persistent="true" useShutdownHook="false">
    <persistenceAdapter>
        <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb"/>
    </persistenceAdapter>
</broker>

2.3 JDBC消息存储

ActiveMQ支持使用JDBC来持久化消息

2.3.1 预定义的表

1:消息表,缺省表名为ACTIVEMQ_MSGS,queue和topic都存在里面,结构如下:

2:ACTIVEMQ_ACKS表存储持久订阅的信息和最后一个持久订阅接收的消息ID,结构如下:

3:锁定表,缺省表名为ACTIVEMQ_LOCK,用来确保在某一时刻,只能有一个ActiveMQ broker实例来访问数据库 ,结构如下:

2.3.2 配置activemq/conf/activemq.xml

<persistenceAdapter>
    <!--
        <kahaDB directory="${activemq.data}/kahadb"/>
    -->
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
...
...
</broker>
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url" value="jdbc:mysql://192.168.91.4:3306/db_activemq?useUnicode=true&amp;characterEncoding=UTF-8" />
    <property name="username" value="root" />
    <property name="password" value="123456"/>
</bean>

注意:此处需要上传数据库驱动包到/opt/activemq/lib下,我这里连接池用的是阿里的,所以还要上传druid包

重启activemq

测试发送3条队列消息:

补充:JDBC Message Store with ActiveMQ Journal

这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。 配置将persistenceAdapter注释掉,再上persistenceFactory,示例如下:

<!--
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
-->
<persistenceFactory>
    <journalPersistenceAdapterFactory 
        journalLogFiles="4"
        journalLogFileSize="32768"
        useJournal="true"
        useQuickJournal="true"
        dataSource="#mysql-ds"
        dataDirectory="activemq-data" />
</persistenceFactory>

JDBC Store和JDBC Message Store with ActiveMQ Journal的区别:

1:Jdbc with journal的性能优于jdbc

2:Jdbc用于master/slave模式的数据库分享

3:Jdbc with journal不能用于master/slave模式

4:一般情况下,推荐使用jdbc with journal

2.4 Memory消息存储

内存消息存储主要是存储所有的持久化的消息在内存中。这里没有动态的缓存存在,所以你必须注意设置你的broker所在的JVM和内存限制。

配置示例:

<broker ... persistent="false" ...></broker>
时间: 2024-10-21 15:10:28

ActiveMQ(09):ActiveMQ消息存储持久化的相关文章

activemq的高级特性:消息存储持久化

activemq的高级特性之消息存储持久化 有基于文件的,数据库的,内存的.默认的是基于文件的,在安装目录/data/kahadb.在conf/activemq.xml文件中. <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb"/> </persistenceAdapter> 把存储持久化换成mysql的数据库. 1:修改配置文件 <persistenceAdapte

ActiveMq笔记2-消息持久化

为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制. ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB, 无论使用哪种持久化方式,消息的存储逻辑都是一致的.也就是说发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件.内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试.消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去. 1.Ka

【分布式系列之ActiveMq】ActiveMq入门示例

前言 github地址:https://github.com/AndyFlower/web-back/tree/master/ActiveMq01 下载ActiveMQ :http://activemq.apache.org/download.html 放到自己的目录,大致目录如下: bin存放的是脚本文件 conf存放的是基本配置文件 data存放的是日志文件 docs存放的是说明文档 examples存放的是简单的实例 lib存放的是activemq所需jar包 webapps用于存放项目的

ActiveMQ消息存储持久化

--------------------------------------------------------------------------------------------------- ActiveMQ提供了插件式的消息存储,主要有有如下几种: 1.AMQ消息存储-基于文件的存储方式,是以前的默认消息存储 2.KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式 3.JDBC消息存储-消息基于JDBC存储的 4.Memory消息存储-基于内存的消息存储 下面就分别

ActiveMQ 消息存储持久化

ActiveMQ中对于投递模式设置为持久化的消息,broker接收到到消息之后,会先把消息存储到存储介质,然后再转发到消息的监听者 ActiveMQ持久化方式:AMQ.KahaDB.JDBC.LevelDB 持久化配置路径:ActiveMQ\apache-activemq\conf\activemq.xml 官方文档: http://activemq.apache.org/persistence.html Message保存方式 PERSISTENT:保存到磁盘,consumer消费之后,mes

ActiveMQ(3)---ActiveMQ原理分析之消息持久化

持久化消息和非持久化消息的存储原理 正常情况下,非持久化消息是存储在内存中的,持久化消息是存储在文件中的.能够存储的最大消息数据在${ActiveMQ_HOME}/conf/activemq.xml文件中的systemUsage节点 ,SystemUsage配置设置了一些系统内存和硬盘容量 <systemUsage> <systemUsage> <memoryUsage> //该子标记设置整个ActiveMQ节点的“可用内存限制”.这个值不能超过ActiveMQ本身设置

ActiveMQ(2)---ActiveMQ原理分析之消息发送

持久化消息和非持久化消息的发送策略 消息同步发送和异步发送 ActiveMQ支持同步.异步两种发送模式将消息发送到broker上.同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理.这个机制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能 异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高.但是可能会出现消息丢失的情况.所以使用异步发送的前提是在某些情况下允许出现数据丢失的情况. 默认情况下,非

[ActiveMQ]初识ActiveMQ

初识ActiveMQ ActiveMQ介绍 官方网站:http://activemq.apache.org/ 最新版本:ActiveMQ 5.14.1(2016-10-28) 最新版本下载链接:http://activemq.apache.org/download.html 历史版本下载链接:http://archive.apache.org/dist/activemq/ Getting Started Guide链接:http://activemq.apache.org/getting-sta

activemq下activemq.bat不能启动

今天下载了一个apache-activemq-5.5.0-bin.rar解压缩后双击/bin目录下的activemq.bat批处理文件发现启动窗口一闪而过无法启动,最后找到原因是因为在环境变量-系统变量中classpath.JAVA_HOME.Path这些系统变量中没有添加jdk相关的安装路径. classpath下添加“.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar” JAVA_HOME下添加 “C:\Program Files\Java\jdk1.6.