天天看点

RocketMQ源码解析:同步刷盘和异步刷盘的实现

RocketMQ源码解析:同步刷盘和异步刷盘的实现

同步刷盘

在RocketMQ中有同步刷盘和异步刷盘两种方式

RocketMQ源码解析:同步刷盘和异步刷盘的实现

2种刷盘方式适用的场景如下

刷盘方式 适用场景
同步刷盘 数据可靠性高,适用于金融等对数据可靠性要求高的场景,性能比异步刷盘要低
异步刷盘 性能和吞吐量高 , Broker端异常关闭时,有少量消息丢失

根据前面的章节我们知道RocketMQ会通过SendMessageProcessor来处理刷盘的消息,当消息存储到内存中后,就开始刷盘

RocketMQ源码解析:同步刷盘和异步刷盘的实现

异步刷盘的方式有两种,第一种Mmap+PageCache(默认的异步刷盘方式),上面说到的同步刷盘也是这种机制,代码实现如下

@Test
public void writeCaseOne() throws Exception {
    File file = new File("/Users/peng/software/rocketmq/test/case1.txt");
    FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
    MappedByteBuffer byteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 2048);
    byteBuffer.put("hello mmap\n".getBytes());
    // 将 pagecache 中的内容强制刷到磁盘
    byteBuffer.force();
}      

第二种是DirectByteBuffer+PageCache,也就是直接写堆外内存

@Test
public void writeCaseTwo() throws Exception {
    File file = new File("/Users/peng/software/rocketmq/test/case2.txt");
    FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
    ByteBuffer byteBuffer = ByteBuffer.allocate(20);
    byteBuffer.put("hello mmap\n".getBytes());
    byteBuffer.flip();
    while (byteBuffer.hasRemaining()) {
        fileChannel.write(byteBuffer);
    }
    // 将 pagecache 中的内容强制刷到磁盘
    fileChannel.force(false);
}      
RocketMQ源码解析:同步刷盘和异步刷盘的实现

从CommitLog#submitFlushRequest方法可以看到刷盘的逻辑

RocketMQ源码解析:同步刷盘和异步刷盘的实现

当broker端配置的是同步刷盘,但是发送过来的消息不需要等待消息刷盘完成,就会退化成异步刷盘,我们先看同步刷盘,在RocketMQ中,并不是往内存中放一条消息,就刷盘一次,这样效率太低。RocketMQ会每隔10ms统一执行刷盘请求来提高效率

  1. 首先把刷盘的请求封装成GroupCommitRequest,然后放到GroupCommitService的阻塞队列中
  2. GroupCommitService每隔10ms将目前阻塞队列中的刷盘请求统一执行,然后唤醒阻塞等待的线程
  3. RocketMQ源码解析:同步刷盘和异步刷盘的实现
public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            // 有数据过来会结束等待的
            this.waitForRunning(10);
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // 省略部分逻辑
}      
RocketMQ源码解析:同步刷盘和异步刷盘的实现

不断执行doCommit方法进行刷盘,当刷盘完成时,会唤醒等待刷盘的线程

这里有个需要注意的细节点,我我们放请求的时候是放到requestsWrite中,但是读的时候却是在requestsRead中,那么requestsRead中能读取到值吗?

// GroupCommitService
// 读请求列表
private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
// 读请求列表
private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();      

我们来看ServiceThread类的waitForRunning方法

RocketMQ源码解析:同步刷盘和异步刷盘的实现

其实当每次等待结束后都会调用onWaitEnd方法,而GroupCommitService重写了这个方法,在这个方法内部调用swapRequests方法

private void swapRequests() {
    lock.lock();
    try {
        LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    } finally {
        lock.unlock();
    }
}      

swapRequests方法会将requestsWrite和requestsRead中的内容进行交换。

RocketMQ源码解析:同步刷盘和异步刷盘的实现

首先通过上次刷盘位置定位到MappedFile,然后开始刷盘

RocketMQ源码解析:同步刷盘和异步刷盘的实现

可以看到有两种刷盘的方式,调用FileChannel#force(异步刷盘并且开启transientStorePool)或者MappedByteBuffer#force(同步刷盘或者异步刷盘但是不开启transientStorePool)

RocketMQ源码解析:同步刷盘和异步刷盘的实现

当刷盘的时候,需要累积到一定页数才开始刷,同步刷盘是0页,异步输盘是4页。至此同步输盘的逻辑就梳理完了。

其实异步输盘不开启transientStorePool时,执行的逻辑和这个差不多,只是累计的页数不相同而已

异步刷盘

RocketMQ源码解析:同步刷盘和异步刷盘的实现

不开启TransientStorePool

当不开启TransientStorePoo时,会先唤醒FlushRealTimeService线程,然后开始开始刷盘

RocketMQ源码解析:同步刷盘和异步刷盘的实现

先算出输盘的页数,默认4页,如果10s没有刷盘了,则将页数设为0,然后执行MappedFileQueue#flush方法,这个方法在同步刷盘已经分析过了,不再分析。

开启TransientStorePool

当开启TransientStorePool是会先唤醒CommitRealTimeService,将ByteBuffer中的内容刷入FileChannel,接着唤醒FlushRealTimeService线程,将FileChannel中的数据刷入磁盘

RocketMQ源码解析:同步刷盘和异步刷盘的实现

先算出commit的页数,默认4页,如果200ms没有commit了,则将页数设为0(在后续执行流程可以看到commit也对页数有要求),然后执行MappedFileQueue#commit方法,将将ByteBuffer中的内容刷入FileChannelMappedFile#commit0

RocketMQ源码解析:同步刷盘和异步刷盘的实现

至于这两种刷盘方式的好处,我个人理解也不是很深刻,因此转一下社区胡宗棠老师对这个问题的解读

  1. 第一种,Mmap+PageCache的方式,读写消息都走的是pageCache,这样子读写都在pagecache里面不可避免会有锁的问题,在并发的读写操作情况下,会出现缺页中断降低,内存加锁,污染页的回写。
  2. 第二种,DirectByteBuffer(堆外内存)+PageCache的两层架构方式,这样子可以实现读写消息分离,写入消息时候写到的是DirectByteBuffer——堆外内存中,读消息走的是PageCache(对于,DirectByteBuffer是两步刷盘,一步是刷到PageCache,还有一步是刷到磁盘文件中),带来的好处就是,避免了内存操作的很多容易堵的地方,降低了时延,比如说缺页中断降低,内存加锁,污染页的回写。

参考博客