消息队列(五)--- RocketMQ-消息存储2

概述

RocketMQ存储中主要用到以下知识点:

  • mmap 文件映射
  • 内存池
  • 异步刷盘
  • consumeQueue
    同时本节将介绍各个重要的类,本篇文章将介绍 mmap 文件映射的相关方法和内存池相关知识点,刷盘和 consumeQueue 相关知识点在下篇介绍。

MappedFile

mappedFile 对应着底层映射文件,主要的功能是

  • bytebuffer写入映射文件
  • 回刷回文件

重要字段

    public static final int OS_PAGE_SIZE = 1024 * 4;
    protected static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    //加了 final,值不能修改
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    //ADD BY ChenYang
    //  先commit 后 flush
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    protected int fileSize;

    protected FileChannel fileChannel;
    /**
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     * 消息先放到这里先,如果此时 writeBuffer 不为 null (此时有东西在写入)那么再次放入 fileChannel
     */
    protected ByteBuffer writeBuffer = null;
    //
    protected TransientStorePool transientStorePool = null;
    private String fileName;
    private long fileFromOffset;
    private File file;
    //虚拟内存映射 buffer
    private MappedByteBuffer mappedByteBuffer;
    private volatile long storeTimestamp = 0;
    private boolean firstCreateInQueue = false;

fileChannel 映射持久化的文件进来,使用原子类纪录 commit 和 flush 的节点。

init 方法

    public void init(final String fileName, final int fileSize,
                     final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        this.writeBuffer = transientStorePool.borrowBuffer();
        this.transientStorePool = transientStorePool;
    }

    /**
     * 初始化最主要就是文件映射
     */
    private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;

        ensureDirOK(this.file.getParent());

        try {
            // RandomAccessFile
            // 参见 : https://blog.csdn.net/qq496013218/article/details/69397380
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            // fileChannel.map 返回的是堆外内存(java.nio.directByteBuffer)
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("create file channel " + this.fileName + " Failed. ", e);
            throw e;
        } catch (IOException e) {
            log.error("map file " + this.fileName + " Failed. ", e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }

commit 操作。

    /**
     * 1. 判断是否达到commit 的要求
     * 2. 获得锁
     * 3. commit
     * 4. 释放锁
     */
    public int commit(final int commitLeastPages) {
        if (writeBuffer == null) {
            //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
            return this.wrotePosition.get();
        }
        if (this.isAbleToCommit(commitLeastPages)) {
            if (this.hold()) {
                commit0(commitLeastPages);
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }
        //TODO  下面是什么操作
        // All dirty data has been committed to FileChannel.
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }

        return this.committedPosition.get();
    }

    /**
     * 使用 JAVA NIO 的 bytebuffer 创建子 buffer
     * 然后写入到 filechannel 中去
     * @param commitLeastPages
     */
    protected void commit0(final int commitLeastPages) {
        int writePos = this.wrotePosition.get();
        int lastCommittedPosition = this.committedPosition.get();

        if (writePos - this.committedPosition.get() > 0) {
            try {
                ByteBuffer byteBuffer = writeBuffer.slice();
                byteBuffer.position(lastCommittedPosition);
                byteBuffer.limit(writePos);
                this.fileChannel.position(lastCommittedPosition);
                this.fileChannel.write(byteBuffer);
                this.committedPosition.set(writePos);
            } catch (Throwable e) {
                log.error("Error occurred when commit data to FileChannel.", e);
            }
        }
    }

    /**
     *  判断是否满了或是达到了最小的 commit 页数
     * @param commitLeastPages 最小commit 页数
     * @return 是否可以 commit
     */
    protected boolean isAbleToCommit(final int commitLeastPages) {
        int flush = this.committedPosition.get();
        int write = this.wrotePosition.get();

        if (this.isFull()) {
            return true;
        }

        if (commitLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
        }

        return write > flush;
    }
FileChannel,MappedByteBuffer:
这两个类代表的是Mmap 这样的内存映射技术,Mmap 能够将文件直接映射到用户态的内存地址,使得对文件的操作不再是 write/read,而转化为直接对内存地址的操作。

Mmap技术本身也有局限性,也就是操作的文件大小不能太大,因此RocketMQ 中限制了单文件大小来避免这个问题。也就是那个filesize定为1G的原因。

flush 回刷到文件中去

    /**
     * @return The current flushed position
     */
    public int flush(final int flushLeastPages) {
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();

                try {
                    //我们只增加数据到 fileChannel 或是 mappedByteBuffer ,从不同时两者一起增加
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }

                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }

    private boolean isAbleToFlush(final int flushLeastPages) {
        int flush = this.flushedPosition.get();
        int write = getReadPosition();

        if (this.isFull()) {
            return true;
        }

        if (flushLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
        }

        return write > flush;
    }

其中 writebuffer 和 filechannel 什么情况会刷回磁盘呢?以下这种图回答了这个问题。

同时mappedFile还有预热处理,具体见 warmMappedFile 方法 。


TransientStorePool

该类的主要作用是创建内存池,而且是堆外内存,主要作用是消除了申请内存空间,回收的时间,提高了使用的性能。
字段

    private final int poolSize;
    private final int fileSize;
    private final Deque<ByteBuffer> availableBuffers;
    private final MessageStoreConfig storeConfig;
public class TransientStorePool {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);

    private final int poolSize;//池的大小有多少,默认5
    private final int fileSize;//每个commitLog文件大小,默认1G
    private final Deque<ByteBuffer> availableBuffers;//双端队列记录可用的buffers
    private final MessageStoreConfig storeConfig;//存储配置

    public TransientStorePool(final MessageStoreConfig storeConfig) {
        this.storeConfig = storeConfig;
        this.poolSize = storeConfig.getTransientStorePoolSize();
        this.fileSize = storeConfig.getMapedFileSizeCommitLog();
        this.availableBuffers = new ConcurrentLinkedDeque<>();
    }

    /**
     * It‘s a heavy init method.
     * 初始化函数,分配poolSize个fileSize的堆外空间
     */
    public void init() {
        for (int i = 0; i < poolSize; i++) {
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);//虚拟机外内存中分配的空间

            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

            availableBuffers.offer(byteBuffer);
        }
    }

    //销毁availableBuffers中所有buffer数据
    public void destroy() {
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
        }
    }

    //用完了之后,返还一个buffer,对buffer数据进行清理
    public void returnBuffer(ByteBuffer byteBuffer) {
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        this.availableBuffers.offerFirst(byteBuffer);
    }

    //借一个buffer出去
    public ByteBuffer borrowBuffer() {
        ByteBuffer buffer = availableBuffers.pollFirst();
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
        }
        return buffer;
    }

    //剩余可用的buffers数量
    public int remainBufferNumbs() {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
    }
}

可以看到内存池在初始化的过程中,将内存用“lock”锁,防止CPU将进程在主存中的这一部分内存给交换回硬盘。

补充

内存池

在netty的过程在使用过程中,也会使用内存池,内存池的优势是集中管理内存的分配和释放,同时提高分配和释放内存的性能,很多框架会先预先申请一大块内存,然后通过提供响应的分配
和释放接口来使用内存,这样系统的性能也会打打提高。

随机读写,顺序读写

随机和顺序读写,是存储器的两种输入输出方式。存储的数据在磁盘中占据空间,对于一个新磁盘,操作系统会将数据文件依次写入磁盘,当有些数据被删除时,就会空出该数据原来占有的存储空间,时间长了,不断的写入、删除数据,就会产生很多零零散散的存储空间,就会造成一个较大的数据文件放在许多不连续的存贮空间上,读写些这部分数据时,就是随机读写,磁头要不断的调整磁道的位置,以在不同位置上的读写数据,相对于连续空间上的顺序读写,要耗时很多。
在开机时、启动大型程序时,电脑要读取大量小文件,而这些文件也不是连续存放的,也属于随机读取的范围。
随机读写:每一段数据有地址码,可以任意跳到某个地址读取该段数据
顺序读写:数据以一定长度连续存储,中间没有地址码,只能顺序读取

改善方法:做磁盘碎片整理,合并碎片文件,但随后还会再产生碎片造成磁盘读写性能下降,而且也解决不了小文件的随机存取的问题,这只是治标。更好的解决办法:更换电子硬盘(SSD),电子盘由于免除了机械硬盘的磁头运动,对于随机数据的读写极大的提高。
举个例子1: SSD的随机读取延迟只有零点几毫秒,而7200RPM的随机读取延迟有7毫秒左右,5400RPM硬盘更是高达9毫秒之多,体现在性能上就是开关机速度。

举个例子2:假设有1到1000笔的数据。
情况1:现在要读出第1000笔,顺序读写的方式是从第1笔开始读,一直找到第1000笔;随机读写是通过运算,很快的找到第1000笔。
情况2:要找出含“abc”的数据,顺序读写还是从第1笔开始读,一直找到第1000笔;随机读写是通过运算,很快的找到“abc”的数据。

总结

本节介绍了rocketmq中的存储细节,包括 mmap 相关,内存池相关知识点。

参考资料

  • http://silence.work/2019/05/03/RocketMQ-Broker
  • https://www.jianshu.com/p/771cce379994
  • https://blog.csdn.net/qq_33611327/article/details/81738195 (推荐一看)

原文地址:https://www.cnblogs.com/Benjious/p/11785942.html

时间: 2024-11-09 01:41:18

消息队列(五)--- RocketMQ-消息存储2的相关文章

浅谈消息队列之RocketMQ

什么是消息队列? 为什么要用消息队列? 即,应用场景是什么,也就是用了有什么好处 解耦 多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败 异步 多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间 削峰/限流 避免流量过大导致应用系统挂掉的情况 使用消息队列需要注意什么? 系统复杂性增加 如何保证消息队列是高可用,即做到集群高可用 如何保证消费的可靠性传输,即不丢消息 如何保证消息不被重复消费,即保证消费的幂等性 如何保证消息的顺序性,即保证

C#消息队列(MQ)零基础从入门到实战演练

一.课程介绍 如果您从工作中之听过但未有接触过消息对队列(MQ),如果你接触过一点关于MQ的知识,如果没有这么的多如果的话......,那么阿笨将通过本次<C#消息队列零基础从入门到实战演练>分享课让您对消息队列有一个实质性的了解和认识,达到实际的灵活贯通和运用.本次分享课您将学习到以下知识点: 1.微软MSMQ的基本使用技能以及MSMQ在WCF技术中的运用. 2.企业级RabbitMQ消息队列的两种消费模式(生产消费和发布订阅)的介绍和使用. 3.如何实现RabbitMQ客户端(Client

消息队列一

参考页面: http://www.yuanjiaocheng.net/webapi/first.html http://www.yuanjiaocheng.net/entity/entitytypes.html http://www.yuanjiaocheng.net/entity/entity-relations.html http://www.yuanjiaocheng.net/entity/entity-lifecycle.html http://www.yuanjiaocheng.net

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

Linux环境编程之IPC进程间通信(五):Posix消息队列1

对于管道和FIFO来说,必须应该先有读取者存在,否则先有写入者是没有意义的.而消息队列则不同,它是一个消息链表,有足够写权限的线程可往别的队列中放置消息,有足够读权限的线程可从队列中取走消息.每个消息都是一个记录,它由发送者赋予一个优先级.在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达.消息队列是随内核的持续性,一个进程可以往某个队列写入一些消息,然后终止,再让另外一个进程在以后的某个时刻读出这些消息.这跟管道和FIFO不一样,当一个管道或FIFO的最后一次关闭时

分布式消息队列RocketMQ部署与监控

========================================================================================== 一.RocketMQ简介 ========================================================================================== RocketMQ是一款分布式.队列模型的消息中间件,具有以下特点: 1.支持严格的消息顺序: 2.支持Topi

RocketMQ 消息队列单机部署及使用

转载请注明来源:http://blog.csdn.net/loongshawn/article/details/51086876 相关文章: <RocketMQ 消息队列单机部署及使用> < java编写简单消息队列.实现高德坐标变形服务> 0 RocketMQ简单介绍 0.1 介绍 RocketMQ是一个消息中间件. 消息中间件中有两个角色:消息生产者和消息消费者.RocketMQ里相同有这两个概念.消息生产者负责创建消息并发送到RocketMQ服务器.RocketMQ服务器会将

RocketMq消息队列使用

最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性, 目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景 比kafka还是有过之无不及,其实kafka文档很丰富 但RocketMQ网上的文章太少,找不到相关的操作教程 于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究 下载源码的地址 https://github.com/alibaba/RocketMQ/relea

RocketMQ 消息队列简单部署

RocketMQ 是alibaba开源的消息队列. 本文使用的是开源版本v3.18 系统: centos6.x最小化安装 需要用到的软件包: jdk-7u67-linux-x64.tar.gz alibaba-rocketmq-3.1.8.tar.gz 开始安装 #tar xvf jdk-7u67-linux-x64.tar.gz -C /opt/ #tar xvf alibaba-rocketmq-3.1.8.tar.gz -C /opt/ #ln -s /opt/jdk1.7.0_67 /o

消息队列 RabbitMq 的五种形式队列

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法. AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件可以无视消息来源传递消息,不受客户端.消息中间件. 不同的开发语言环境等条件的限制 概念解释: Server(Broker):接收客户端连接,实现 AMQP 协议的消息队列和路由功能的进程: Virtual Host:虚拟主机的概念,类似权限控制组,一个 Virtual Host 里可