RocketMQ设计之同步刷盘

2022-07-16,,

同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的commitlog文件。

commitlog的handlediskflush方法:

public void handlediskflush(appendmessageresult result, putmessageresult putmessageresult, messageext messageext) {
    // synchronization flush
    if (flushdisktype.sync_flush == this.defaultmessagestore.getmessagestoreconfig().getflushdisktype()) {
        final groupcommitservice service = (groupcommitservice) this.flushcommitlogservice;
        if (messageext.iswaitstoremsgok()) {
            groupcommitrequest request = new groupcommitrequest(result.getwroteoffset() + result.getwrotebytes());
            service.putrequest(request);
            boolean flushok = request.waitforflush(this.defaultmessagestore.getmessagestoreconfig().getsyncflushtimeout());
            if (!flushok) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageext.gettopic() + " tags: " + messageext.gettags()
                    + " client address: " + messageext.getbornhoststring());
                putmessageresult.setputmessagestatus(putmessagestatus.flush_disk_timeout);
            }
        } else {
            service.wakeup();
        }
    }
    // asynchronous flush
    else {
        if (!this.defaultmessagestore.getmessagestoreconfig().istransientstorepoolenable()) {
            flushcommitlogservice.wakeup();
        } else {
            commitlogservice.wakeup();
        }
    }
}


class groupcommitservice extends flushcommitlogservice {
        private volatile list<groupcommitrequest> requestswrite = new arraylist<groupcommitrequest>();
        private volatile list<groupcommitrequest> requestsread = new arraylist<groupcommitrequest>();

        //提交刷盘任务到任务列表
        public synchronized void putrequest(final groupcommitrequest request) {
            synchronized (this.requestswrite) {
                this.requestswrite.add(request);
            }
            if (hasnotified.compareandset(false, true)) {
                waitpoint.countdown(); // notify
            }
        }

        private void swaprequests() {
            list<groupcommitrequest> tmp = this.requestswrite;
            this.requestswrite = this.requestsread;
            this.requestsread = tmp;
        }

        private void docommit() {
            synchronized (this.requestsread) {
                if (!this.requestsread.isempty()) {
                    for (groupcommitrequest req : this.requestsread) {
                        // there may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushok = false;
                        for (int i = 0; i < 2 && !flushok; i++) {
                            flushok = commitlog.this.mappedfilequeue.getflushedwhere() >= req.getnextoffset();

                            if (!flushok) {
                                commitlog.this.mappedfilequeue.flush(0);
                            }
                        }

                        req.wakeupcustomer(flushok);
                    }

                    long storetimestamp = commitlog.this.mappedfilequeue.getstoretimestamp();
                    if (storetimestamp > 0) {
                        commitlog.this.defaultmessagestore.getstorecheckpoint().setphysicmsgtimestamp(storetimestamp);
                    }

                    this.requestsread.clear();
                } else {
                    // because of individual messages is set to not sync flush, it
                    // will come to this process
                    commitlog.this.mappedfilequeue.flush(0);
                }
            }
        }

        public void run() {
            commitlog.log.info(this.getservicename() + " service started");

            while (!this.isstopped()) {
                try {
                    this.waitforrunning(10);
                    this.docommit();
                } catch (exception e) {
                    commitlog.log.warn(this.getservicename() + " service has exception. ", e);
                }
            }

            // under normal circumstances shutdown, wait for the arrival of the
            // request, and then flush
            try {
                thread.sleep(10);
            } catch (interruptedexception e) {
                commitlog.log.warn("groupcommitservice exception, ", e);
            }

            synchronized (this) {
                this.swaprequests();
            }

            this.docommit();

            commitlog.log.info(this.getservicename() + " service end");
        }

        @override
        protected void onwaitend() {
            this.swaprequests();
        }

        @override
        public string getservicename() {
            return groupcommitservice.class.getsimplename();
        }

        @override
        public long getjointime() {
            return 1000 * 60 * 5;
        }
    }

groupcommitrequest是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程

groupcommitservice每隔10毫秒写一批数据到磁盘。之所以不直接写是磁盘io压力大,写入性能低,每隔10毫秒写一次可以提升磁盘io效率和写入性能。

  • putrequest(request) 提交刷盘任务到任务列表
  • request.waitforflush同步等待groupcommitservice将任务列表中的任务刷盘完成。

两个队列读写分离,requestswrite是写队列,用户保存添加进来的刷盘任务,requestsread是读队列,在刷盘之前会把写队列的数据放入读队列。

commitlog的docommit方法:

private void docommit() {
            synchronized (this.requestsread) {
                if (!this.requestsread.isempty()) {
                    for (groupcommitrequest req : this.requestsread) {
                        // there may be a message in the next file, so a maximum of
                        // two times the flush
                        boolean flushok = false;
                        for (int i = 0; i < 2 && !flushok; i++) {
                            //根据offset确定是否已经刷盘
                            flushok = commitlog.this.mappedfilequeue.getflushedwhere() >= req.getnextoffset();

                            if (!flushok) {
                                commitlog.this.mappedfilequeue.flush(0);
                            }
                        }

                        req.wakeupcustomer(flushok);
                    }

                    long storetimestamp = commitlog.this.mappedfilequeue.getstoretimestamp();
                    if (storetimestamp > 0) {
                        commitlog.this.defaultmessagestore.getstorecheckpoint().setphysicmsgtimestamp(storetimestamp);
                    }
                    //清空已刷盘的列表
                    this.requestsread.clear();
                } else {
                    // because of individual messages is set to not sync flush, it
                    // will come to this process
                    commitlog.this.mappedfilequeue.flush(0);
                }
            }
        }
  • 刷盘的时候依次读取requestsread中的数据写入磁盘,
  • 写入完成后清空requestsread

读写分离设计的目的是在刷盘时不影响任务提交到列表。

commitlog.this.mappedfilequeue.flush(0);是刷盘操作:

public boolean flush(final int flushleastpages) {
    boolean result = true;
    mappedfile mappedfile = this.findmappedfilebyoffset(this.flushedwhere, this.flushedwhere == 0);
    if (mappedfile != null) {
        long tmptimestamp = mappedfile.getstoretimestamp();
        int offset = mappedfile.flush(flushleastpages);
        long where = mappedfile.getfilefromoffset() + offset;
        result = where == this.flushedwhere;
        this.flushedwhere = where;
        if (0 == flushleastpages) {
            this.storetimestamp = tmptimestamp;
        }
    }

    return result;
}

通过mappedfile映射的commitlog文件写入磁盘

这就是rocketmq高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而groupcommitservice每隔10毫秒写一批数据到磁盘。

到此这篇关于rocketmq设计之同步刷盘的文章就介绍到这了,更多相关rocketmq同步刷盘内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

《RocketMQ设计之同步刷盘.doc》

下载本文的Word格式文档,以方便收藏与打印。