ActiveMQ 消息存储

深入浅出 JMS(四) - ActiveMQ 消息存储

 一、消息的存储方式

ActiveMQ 支持 JMS 规范中的持久化消息与非持久化消息

  • 持久化消息通常用于不管是否消费者在线,它们都会保证消息会被消费者消费。当消息被确认消费后,会从存储中删除
  • 非持久化消息通常用于发送通知以及实时数据,通常要求性能优先,消息可靠性并不是必须的情况
  • MQ 支持可插拔式的消息存储,如:内存、文件和关系数据库等方式

Queue 消息模型在 ActiveMQ 的存储:

  采用存储采用先进先出(FIFO),一个消息只能被一个消费者消费,当消息被确认消费之后才会被删除。

Topic消息模型(针对持久订阅):

  每个订阅者获取的消息实际是消息的一个副本,只有一个消息副本会被存储,MQ 提供了一个指针来指向消息存储并且分发消息副本到订阅者,消息直到所有的持久化订阅者都被接收才能被删除。

持久化存储方式:

  • KahaDB 消息存储
  • AMQ 消息存储
  • JDBC 消息存储
  • 内存消息存储

二、KahaDB 存储方式

  KahaDB 是从 ActiveMQ 5.4 开始默认的持久化插件。KahaDb 恢复时间远远小于其前身 AMQ 并且使用更少的数据文件,所以可以完全代替 AMQ,kahaDB 的持久化机制同样是基于日志文件,索引和缓存。

(一)KahaDB 主要特性:

  • 日志形式存储消息;
  • 消息索引以 B-Tree 结构存储,可以快速更新;
  • 完全支持 JMS 事务;
  • 支持多种恢复机制;

(二)适用场景:

高吞吐量的应用程序
存储大数据量的消息

(三)配置方式 conf/activemq.xml:

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

(四)KahaDB 存储原理

当有活动消费者时,用于临时存储,消息会被发送给消费着,同时被安排将被存储,如果消息及时被确认,就不需要写入到磁盘。写入到磁盘中的数据消息,在后续的消息活动中,如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。

(1) KahaDB 内部结构

  • Data logs:消息日志包含了消息日志和一些命令
  • Cache:当有活动消费者时,用于临时存储,消息会被发送给消费着,同时被安排将被存储,如果消息及时被确认,这不需要写入到磁盘
  • Btree indexes(消息索引):用于引用消息日志(message id),它存储在内存中,这样能快速定位到。MQ会定期将内存中的消息索引保存到 metadata store 中,避免大量消息未发送时,消息索引占用过多内存空间。
  • Redo log:用于在非正常关机情况下维护索引完整性。

(2) 目录结构

  • Db log files:用于存储消息(默认大小32M),当 log 日志满了,会创建一个新的,当 log 日志中的消息都被删除,该日志文件会被删除或者归档。
  • Archive directory:当 datalog 不在被 kahadb 需要会被归档(通过 archiveDataLogs 属性控制)。
  • Db.data:存放 Btree indexs。
  • Db.redo:存放 redo file,用于恢复 Btree indexs。

三、AMQ 消息存储

写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。每个日志文件的大小都是有限制的(默认32m,可自行配置)。当超过这个大小,系统会重新建立一个文件。当所有的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。主要的缺点是 AMQ Message 会为每一个 Destination 创建一个索引,如果使用了大量的 Queue,索引文件的大小会占用很多磁盘空间。而且由于索引巨大,一旦 Broker 崩溃,重建索引的速度会非常慢。

特点:类似 KahaDB,也包含了事务日志,每个 destination 都包含一个 index 文件,AMQ 适用于高吞吐量的应用场景,但是不适合多个队列的场景。

配置方式 conf/activemq.xml:

<!--AMQ directory:数据存储路径 syncOnWrite:是否同步写入  maxFileLength:日志文件大小 -->
<persistenceAdapter>
    <amqPersistenceAdapter
        directory="${activemq.data}/AMQdb"
        syncOnWrite="true"
        maxFileLength="10mb" />
</persistenceAdapter>

(1) AMQ内部结构

  • Data logs:消息日志包含了消息日志
  • Cache:用于消息的快速检索
  • Reference store indexes:用于引用 datalogs 中的消息,通过 message ID 关联

(2) 目录结构

  • Lock:保证同一时间只有一个 borker 访问文件目录
  • temp-storag:用于存储非持久化消息(当不在被存储在内存中),如等待慢消费者处理消息
  • Kr-store:用于存储引用消息日志数据
  • journal directory:包含了消息文件、消息日志和消息控制信息
  • Archive:归档的数据日志

四、JDBC存储

支持通过 JDBC 将消息存储到关系数据库,性能上不如文件存储,能通过关系型数据库查询到消息的信息。

MQ 支持的数据库:Apache Derby、MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。

存储表结构:

表 1:ACTIVEMQ_MSGS:用于存储消息,Queue 和 Topic 都存储在这个表中:

字段 说明
ID 自增的数据库主键
CONTAINER 消息的Destination
MSGID_PROD 消息发送者客户端的主键
MSG_SEQ 是发送消息的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION 消息的过期时间,存储的是从1970-01-01到现在的毫秒数
MSG 消息本体的Java序列化对象的二进制数据
PRIORITY 优先级,从0-9,数值越大优先级越高

表 2:ACTIVEMQ_ACKS:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:

字段 说明
CONTAINER 消息的Destination
SUB_DEST 如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID 每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME 订阅者名称
SELECTOR 选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID 记录消费过的消息的ID

表 3:ACTIVEMQ_LOCK(消息锁,保证同一时间只能有一个broker访问这些表结构):

表 activemq_lock 在集群环境中才有用,只有一个 Broker 可以获得消息,称为 Master Broker,其他的只能作为备份等待 Master Broker 不可用,才可能成为下一个 Master Broker。这个表用于记录哪个 Broker 是当前的 Master Broker。

配置方式:

1、配置数据源 conf/acticvemq.xml 文件:

<!-- 配置数据源-->
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="111111"/>
    <property name="maxActive" value="200"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

2、配置 broke 中的 persistenceAdapter

dataSource 指定持久化数据库的 bean,createTablesOnStartup 是否在启动的时候创建数据表,默认值是 true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为 true,之后改成 false。

<!-- JDBC配置 -->
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"  createTablesOnStartup="false"/>
</persistenceAdapter> 

ps:数据库 activemq 需要手动创建。

五、内存消息存储

内存消息存储,会将所有的持久化消息存储在内存中,必须注意JVM使用情况以及内存限制,适用于一些能快速消费的数据量不大的小消息,当MQ关闭或者宕机,未被消费的内存消息会被清空。

配置方式 设置 broker属性值 persistent="false":

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="false"/>

每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/Soprano/p/10659576.html

时间: 2024-10-17 00:32:37

ActiveMQ 消息存储的相关文章

ActiveMQ(09):ActiveMQ消息存储持久化

一.简介 1.1 描述 ActiveMQ不仅支持persistent和non-persistent两种方式,还支持消息的恢复(recovery)方式.重新投递等 1.2 PTP与PUB/SUB 1.2.1 PTP 对于持久化订阅主题,每一个消费者将获得一个消息的复制. 1.2.2 PUB/SUB 对于持久化订阅主题,每一个消费者将获得一个消息的复制. 1.3 有效的消息存储 ActiveMQ提供了一个插件式的消息存储,类似于消息的多点传播,主要实现了如下几种: 1:AMQ消息存储-基于文件的存储

ActiveMQ消息存储持久化

--------------------------------------------------------------------------------------------------- ActiveMQ提供了插件式的消息存储,主要有有如下几种: 1.AMQ消息存储-基于文件的存储方式,是以前的默认消息存储 2.KahaDB消息存储-提供了容量的提升和恢复能力,是现在的默认存储方式 3.JDBC消息存储-消息基于JDBC存储的 4.Memory消息存储-基于内存的消息存储 下面就分别

ActiveMQ 消息存储持久化

ActiveMQ中对于投递模式设置为持久化的消息,broker接收到到消息之后,会先把消息存储到存储介质,然后再转发到消息的监听者 ActiveMQ持久化方式:AMQ.KahaDB.JDBC.LevelDB 持久化配置路径:ActiveMQ\apache-activemq\conf\activemq.xml 官方文档: http://activemq.apache.org/persistence.html Message保存方式 PERSISTENT:保存到磁盘,consumer消费之后,mes

g boot、Dubbo、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm

15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程内容包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat.Spring.MongoDB.ZeroMQ.Git.Nosql.Jvm.Mecached.Netty.Nio.Mina.性能调优.高并发.to

ActiveMQ消息队列介绍(转)

ActiveMQ是一个开源兼容Java Message Service (JMS) 1.1面向消息的中件间. 来自Apache Software Foundation. ActiveMQ提供松耦合的应用程序架构. 先来看两个应用通过RPC通讯的紧耦合: 通过面向消息的中件间, 架构演变为: 我们看到应用程序1发送message到中件间, 应用程序2从中件间接收message. ActiveMQ提供了灵活的应用程序架构. ActiveMQ消息存储也是FIFO: 什么时候使用ActiveMQ: 1.

java高级视频课程Dubbo、Redis、ActiveMQ、Nginx、Mycat、Spring、MongoDB、ZeroMQ、Git、Nosql、Jvm、Mecached、Netty、Nio、Mina

* { font-family: "Microsoft YaHei" !important } h1 { color: #FF0 } 15套java架构师.集群.高可用.高可扩 展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布 式项目实战视频教程 视频课程包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat

java架构师负载均衡、高并发、nginx优化、tomcat集群、异步性能优化、Dubbo分布式、Redis持久化、ActiveMQ中间件、Netty互联网、spring大型分布式项目实战视频教程百度网盘

15套Java架构师详情 * { font-family: "Microsoft YaHei" !important } h1 { background-color: #006; color: #FF0 } 15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程包含: 高级Java架构师包含:Spring boot.Spring  clo

ActiveMQ(13):ActiveMQ的集群

一.简介 1.1 消费者集群(Queue consumer clusters) ActiveMQ支持Consumer对消息高可靠性的负载平衡消费,如果一个Consumer死掉,该消息会转发到其它的Consumer消费的Queue上. 如果一个Consumer获得消息比其它Consumer快,那么他将获得更多的消息. 因此推荐ActiveMQ的Broker和Client使用failover://transport的方式来配置链接. 1.2 Broker clusters 大部情况下是使用一系列的B

Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式项目实战视频教程

15套java架构师.集群.高可用.高可扩展.高性能.高并发.性能优化.Spring boot.Redis.ActiveMQ.Nginx.Mycat.Netty.Jvm大型分布式项目实战视频教程 视频课程内容包含: 高级Java架构师包含:Spring boot.Spring  cloud.Dubbo.Redis.ActiveMQ.Nginx.Mycat.Spring.MongoDB.ZeroMQ.Git.Nosql.Jvm.Mecached.Netty.Nio.Mina.性能调优.高并发.to