Netty源码分析 (八)----- write过程 源码分析

2022-10-16,,,,

上一篇文章主要讲了netty的read过程,本文主要分析一下write和writeandflush。

主要内容

本文分以下几个部分阐述一个java对象最后是如何转变成字节流,写到socket缓冲区中去的

  1. pipeline中的标准链表结构
  2. java对象编码过程
  3. write:写队列
  4. flush:刷新写队列
  5. writeandflush: 写队列并刷新

pipeline中的标准链表结构

一个标准的pipeline链式结构如下

数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务handler处理,调用write,将结果对象写出去。而写的过程先通过tail节点,然后通过encoder节点将对象编码成bytebuf,最后将该bytebuf对象传递到head节点,调用底层的unsafe写到jdk底层管道

java对象编码过程

为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的bytebuf,写到管道里?

我们先看下调用write的code

businesshandler

protected void channelread0(channelhandlercontext ctx, request request) throws exception {
    response response = dobusiness(request);

    if (response != null) {
        ctx.channel().write(response);
    }
}

业务处理器接受到请求之后,做一些业务处理,返回一个response,然后,response在pipeline中传递,落到 encoder节点,我们来跟踪一下 ctx.channel().write(response);

public channelfuture write(object msg) {
    return this.pipeline.write(msg);
}

调用了channel中的pipeline中的write方法,我们接着看

public final channelfuture write(object msg) {
    return this.tail.write(msg);
}

pipeline中有属性tail,调用tail中的write,由此我们知道write消息的时候,从tail开始,接着往下看

private void write(object msg, boolean flush, channelpromise promise) {
    abstractchannelhandlercontext next = this.findcontextoutbound();
    object m = this.pipeline.touch(msg, next);
    eventexecutor executor = next.executor();
    if (executor.ineventloop()) {
        if (flush) {
            next.invokewriteandflush(m, promise);
        } else {
            next.invokewrite(m, promise);
        }
    } else {
        object task;
        if (flush) {
            task = abstractchannelhandlercontext.writeandflushtask.newinstance(next, m, promise);
        } else {
            task = abstractchannelhandlercontext.writetask.newinstance(next, m, promise);
        }

        safeexecute(executor, (runnable)task, promise, m);
    }

}

中间我省略了几个重载的方法,我们来看看第一行代码,next = this.findcontextoutbound();

private abstractchannelhandlercontext findcontextoutbound() {
    abstractchannelhandlercontext ctx = this;

    do {
        ctx = ctx.prev;
    } while(!ctx.outbound);

    return ctx;
}

通过 ctx = ctx.prev; 我们知道从tail开始找到pipeline中的第一个outbound的handler,然后调用 invokewrite(m, promise),此时找到的第一个outbound的handler就是我们自定义的编码器encoder

我们接着看 next.invokewrite(m, promise);

private void invokewrite(object msg, channelpromise promise) {
    if (this.invokehandler()) {
        this.invokewrite0(msg, promise);
    } else {
        this.write(msg, promise);
    }

}
private void invokewrite0(object msg, channelpromise promise) {
    try {
        ((channeloutboundhandler)this.handler()).write(this, msg, promise);
    } catch (throwable var4) {
        notifyoutboundhandlerexception(var4, promise);
    }

}

一路代码跟下来,我们可以知道是调用了第一个outbound类型的handler中的write方法,也就是第一个调用的是我们自定义编码器encoder的write方法

我们来看看自定义encoder

public class encoder extends messagetobyteencoder<response> {
    @override
    protected void encode(channelhandlercontext ctx, response response, bytebuf out) throws exception {
        out.writebyte(response.getversion());
        out.writeint(4 + response.getdata().length);
        out.writebytes(response.getdata());
    }
}

自定义encoder继承 messagetobyteencoder ,并且重写了 encode方法,这就是编码器的核心,我们先来看 messagetobyteencoder

public abstract class messagetobyteencoder<i> extends channeloutboundhandleradapter {

我们看到 messagetobyteencoder 继承了 channeloutboundhandleradapter,说明了 encoder 是一个 outbound的handler

我们来看看 encoder 的父类 messagetobyteencoder中的write方法

messagetobyteencoder

@override
public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception {
    bytebuf buf = null;
    try {
        // 判断当前handelr是否能处理写入的消息
        if (acceptoutboundmessage(msg)) {
            @suppresswarnings("unchecked")
            // 强制换换
            i cast = (i) msg;
            // 分配一段butebuf
            buf = allocatebuffer(ctx, cast, preferdirect);
            try {
            // 调用encode,这里就调回到  `encoder` 这个handelr中    
                encode(ctx, cast, buf);
            } finally {
                // 既然自定义java对象转换成bytebuf了,那么这个对象就已经无用了,释放掉
                // (当传入的msg类型是bytebuf的时候,就不需要自己手动释放了)
                referencecountutil.release(cast);
            }
            // 如果buf中写入了数据,就把buf传到下一个节点
            if (buf.isreadable()) {
                ctx.write(buf, promise);
            } else {
            // 否则,释放buf,将空数据传到下一个节点    
                buf.release();
                ctx.write(unpooled.empty_buffer, promise);
            }
            buf = null;
        } else {
            // 如果当前节点不能处理传入的对象,直接扔给下一个节点处理
            ctx.write(msg, promise);
        }
    } catch (encoderexception e) {
        throw e;
    } catch (throwable e) {
        throw new encoderexception(e);
    } finally {
        // 当buf在pipeline中处理完之后,释放
        if (buf != null) {
            buf.release();
        }
    }
}

这里,我们详细阐述一下encoder是如何处理传入的java对象的

1.判断当前handler是否能处理写入的消息,如果能处理,进入下面的流程,否则,直接扔给下一个节点处理

2.将对象强制转换成encoder可以处理的 response对象

3.分配一个bytebuf

4.调用encoder,即进入到 encoderencode方法,该方法是用户代码,用户将数据写入bytebuf

5.既然自定义java对象转换成bytebuf了,那么这个对象就已经无用了,释放掉,(当传入的msg类型是bytebuf的时候,就不需要自己手动释放了)

6.如果buf中写入了数据,就把buf传到下一个节点,否则,释放buf,将空数据传到下一个节点

7.最后,当buf在pipeline中处理完之后,释放节点

总结一点就是,encoder节点分配一个bytebuf,调用encode方法,将java对象根据自定义协议写入到bytebuf,然后再把bytebuf传入到下一个节点,在我们的例子中,最终会传入到head节点,因为head节点是一个outbount类型的handler

headcontext

public void write(channelhandlercontext ctx, object msg, channelpromise promise) throws exception {
    unsafe.write(msg, promise);
}

这里的msg就是前面在encoder节点中,载有java对象数据的自定义bytebuf对象,进入下一节

write:写队列

我们来看看channel中unsafe的write方法,先来看看其中的一个属性

abstractunsafe

protected abstract class abstractunsafe implements unsafe {
    private volatile channeloutboundbuffer outboundbuffer = new channeloutboundbuffer(abstractchannel.this);

我们来看看 channeloutboundbuffer 这个类

public final class channeloutboundbuffer {
    private final channel channel;
    private channeloutboundbuffer.entry flushedentry;
    private channeloutboundbuffer.entry unflushedentry;
    private channeloutboundbuffer.entry tailentry;

channeloutboundbuffer内部维护了一个entry链表,并使用entry封装msg。其中的属性我们下面会详细讲

我们回到正题,接着看 unsafe.write(msg, promise);

abstractunsafe

@override
public final void write(object msg, channelpromise promise) {
    asserteventloop();

    channeloutboundbuffer outboundbuffer = this.outboundbuffer;

    int size;
    try {
        msg = filteroutboundmessage(msg);
        size = pipeline.estimatorhandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (throwable t) {
        safesetfailure(promise, t);
        referencecountutil.release(msg);
        return;
    }

    outboundbuffer.addmessage(msg, size, promise);
}

1.调用 filteroutboundmessage() 方法,将待写入的对象过滤,把非bytebuf对象和fileregion过滤,把所有的非直接内存转换成直接内存directbuffer

@override
protected final object filteroutboundmessage(object msg) {
    if (msg instanceof bytebuf) {
        bytebuf buf = (bytebuf) msg;
        if (buf.isdirect()) {
            return msg;
        }

        return newdirectbuffer(buf);
    }

    if (msg instanceof fileregion) {
        return msg;
    }

    throw new unsupportedoperationexception(
            "unsupported message type: " + stringutil.simpleclassname(msg) + expected_types);
}

2.接下来,估算出需要写入的bytebuf的size
3.最后,调用 channeloutboundbuffer 的addmessage(msg, size, promise) 方法,所以,接下来,我们需要重点看一下这个方法干了什么事情

channeloutboundbuffer

public void addmessage(object msg, int size, channelpromise promise) {
    // 创建一个待写出的消息节点
    entry entry = entry.newinstance(msg, size, total(msg), promise);
    if (tailentry == null) {
        flushedentry = null;
        tailentry = entry;
    } else {
        entry tail = tailentry;
        tail.next = entry;
        tailentry = entry;
    }
    if (unflushedentry == null) {
        unflushedentry = entry;
    }

    incrementpendingoutboundbytes(size, false);
}

想要理解上面这段代码,必须得掌握写缓存中的几个消息指针,如下图

channeloutboundbuffer 里面的数据结构是一个单链表结构,每个节点是一个 entryentry 里面包含了待写出bytebuf 以及消息回调 promise,下面分别是三个指针的作用

1.flushedentry 指针表示第一个被写到操作系统socket缓冲区中的节点

2.unflushedentry 指针表示第一个未被写入到操作系统socket缓冲区中的节点

3.tailentry指针表示channeloutboundbuffer缓冲区的最后一个节点

初次调用 addmessage 之后,各个指针的情况为

fushedentry指向空,unfushedentry和 tailentry 都指向新加入的节点

第二次调用 addmessage之后,各个指针的情况为

第n次调用 addmessage之后,各个指针的情况为

可以看到,调用n次addmessage,flushedentry指针一直指向null,表示现在还未有节点需要写出到socket缓冲区,而unfushedentry之后有n个节点,表示当前还有n个节点尚未写出到socket缓冲区中去

flush:刷新写队列

不管调用channel.flush(),还是ctx.flush(),最终都会落地到pipeline中的head节点

headcontext

@override
public void flush(channelhandlercontext ctx) throws exception {
    unsafe.flush();
}

之后进入到abstractunsafe

abstractunsafe

public final void flush() {
   asserteventloop();

   channeloutboundbuffer outboundbuffer = this.outboundbuffer;
   if (outboundbuffer == null) {
       return;
   }

   outboundbuffer.addflush();
   flush0();
}

flush方法中,先调用 outboundbuffer.addflush();

channeloutboundbuffer

public void addflush() {
    entry entry = unflushedentry;
    if (entry != null) {
        if (flushedentry == null) {
            flushedentry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setuncancellable()) {
                int pending = entry.cancel();
                decrementpendingoutboundbytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
        unflushedentry = null;
    }
}

可以结合前面的图来看,首先拿到 unflushedentry 指针,然后将 flushedentry 指向unflushedentry所指向的节点,调用完毕之后,三个指针的情况如下所示

 

相当于所有的节点都即将开始推送出去

接下来,调用 flush0();

abstractunsafe

protected void flush0() {
    dowrite(outboundbuffer);
}

发现这里的核心代码就一个 dowrite,继续跟

abstractniobytechannel

protected void dowrite(channeloutboundbuffer in) throws exception {
    int writespincount = -1;

    boolean setopwrite = false;
    for (;;) {
        // 拿到第一个需要flush的节点的数据
        object msg = in.current();

        if (msg instanceof bytebuf) {
            // 强转为bytebuf,若发现没有数据可读,直接删除该节点
            bytebuf buf = (bytebuf) msg;

            boolean done = false;
            long flushedamount = 0;
            // 拿到自旋锁迭代次数
            if (writespincount == -1) {
                writespincount = config().getwritespincount();
            }
            // 自旋,将当前节点写出
            for (int i = writespincount - 1; i >= 0; i --) {
                int localflushedamount = dowritebytes(buf);
                if (localflushedamount == 0) {
                    setopwrite = true;
                    break;
                }

                flushedamount += localflushedamount;
                if (!buf.isreadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedamount);

            // 写完之后,将当前节点删除
            if (done) {
                in.remove();
            } else {
                break;
            }
        } 
    }
}

这里略微有点复杂,我们分析一下

1.第一步,调用current()先拿到第一个需要flush的节点的数据

 channeloutboundbuffer

public object current() {
    entry entry = flushedentry;
    if (entry == null) {
        return null;
    }

    return entry.msg;
}

2.第二步,拿到自旋锁的迭代次数

if (writespincount == -1) {
    writespincount = config().getwritespincount();
}

3.自旋的方式将bytebuf写出到jdk nio的channel

for (int i = writespincount - 1; i >= 0; i --) {
    int localflushedamount = dowritebytes(buf);
    if (localflushedamount == 0) {
        setopwrite = true;
        break;
    }

    flushedamount += localflushedamount;
    if (!buf.isreadable()) {
        done = true;
        break;
    }
}

dowritebytes 方法跟进去

protected int dowritebytes(bytebuf buf) throws exception {
    final int expectedwrittenbytes = buf.readablebytes();
    return buf.readbytes(javachannel(), expectedwrittenbytes);
}

我们发现,出现了 javachannel(),表明已经进入到了jdk nio channel的领域,我们来看看 buf.readbytes(javachannel(), expectedwrittenbytes);

public int readbytes(gatheringbytechannel out, int length) throws ioexception {
    this.checkreadablebytes(length);
    int readbytes = this.getbytes(this.readerindex, out, length);
    this.readerindex += readbytes;
    return readbytes;
}

我们来看关键代码 this.getbytes(this.readerindex, out, length)

private int getbytes(int index, gatheringbytechannel out, int length, boolean internal) throws ioexception {
    this.checkindex(index, length);
    if (length == 0) {
        return 0;
    } else {
        bytebuffer tmpbuf;
        if (internal) {
            tmpbuf = this.internalniobuffer();
        } else {
            tmpbuf = ((bytebuffer)this.memory).duplicate();
        }

        index = this.idx(index);
        tmpbuf.clear().position(index).limit(index + length);
        //将tmpbuf中的数据写到out中
        return out.write(tmpbuf);
    }
}

我们来看看out.write(tmpbuf)

public int write(bytebuffer src) throws ioexception {
    ensureopen();
    if (!writable)
        throw new nonwritablechannelexception();
    synchronized (positionlock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            ti = threads.add();
            if (!isopen())
                return 0;
            do {
                n = ioutil.write(fd, src, -1, nd);
            } while ((n == iostatus.interrupted) && isopen());
            return iostatus.normalize(n);
        } finally {
            threads.remove(ti);
            end(n > 0);
            assert iostatus.check(n);
        }
    }
}

和read实现一样,socketchannelimpl的write方法通过ioutil的write实现:关键代码 n = ioutil.write(fd, src, -1, nd);

static int write(filedescriptor var0, bytebuffer var1, long var2, nativedispatcher var4) throws ioexception {
    //如果是directbuffer,直接写,将堆外缓存中的数据拷贝到内核缓存中进行发送
    if (var1 instanceof directbuffer) {
        return writefromnativebuffer(var0, var1, var2, var4);
    } else {
        //非directbuffer
        //获取已经读取到的位置
        int var5 = var1.position();
        //获取可以读到的位置
        int var6 = var1.limit();

        assert var5 <= var6;
        //申请一个原buffer可读大小的directbytebuffer
        int var7 = var5 <= var6 ? var6 - var5 : 0;
        bytebuffer var8 = util.gettemporarydirectbuffer(var7);

        int var10;
        try {

            var8.put(var1);
            var8.flip();
            var1.position(var5);
            //通过directbuffer写,将堆外缓存的数据拷贝到内核缓存中进行发送
            int var9 = writefromnativebuffer(var0, var8, var2, var4);
            if (var9 > 0) {
                var1.position(var5 + var9);
            }

            var10 = var9;
        } finally {
            //回收分配的directbytebuffer
            util.offerfirsttemporarydirectbuffer(var8);
        }

        return var10;
    }
}

代码逻辑我们就不再讲了,代码注释已经很清楚了,这里我们关注一点,我们可以看看我们前面的一个方法 filteroutboundmessage(),将待写入的对象过滤,把非bytebuf对象和fileregion过滤,把所有的非直接内存转换成直接内存directbuffer

说明到了这一步所有的 var1 意境是直接内存directbuffer,就不需要走到else,就不需要write两次了

4.删除该节点

节点的数据已经写入完毕,接下来就需要删除该节点

channeloutboundbuffer

public boolean remove() {
    entry e = flushedentry;
    object msg = e.msg;

    channelpromise promise = e.promise;
    int size = e.pendingsize;

    removeentry(e);

    if (!e.cancelled) {
        referencecountutil.saferelease(msg);
        safesuccess(promise);
    }

    // recycle the entry
    e.recycle();

    return true;
}

首先拿到当前被flush掉的节点(flushedentry所指),然后拿到该节点的回调对象 channelpromise, 调用 removeentry()方法移除该节点

private void removeentry(entry e) {
    if (-- flushed == 0) {
        flushedentry = null;
        if (e == tailentry) {
            tailentry = null;
            unflushedentry = null;
        }
    } else {
        flushedentry = e.next;
    }
}

这里的remove是逻辑移除,只是将flushedentry指针移到下个节点,调用完毕之后,节点图示如下

writeandflush: 写队列并刷新

理解了write和flush这两个过程,writeandflush 也就不难了

public final channelfuture writeandflush(object msg) {
    return tail.writeandflush(msg);
}

public channelfuture writeandflush(object msg) {
    return writeandflush(msg, newpromise());
}

public channelfuture writeandflush(object msg, channelpromise promise) {
    write(msg, true, promise);

    return promise;
}

private void write(object msg, boolean flush, channelpromise promise) {
    abstractchannelhandlercontext next = findcontextoutbound();
    eventexecutor executor = next.executor();
    if (executor.ineventloop()) {
        if (flush) {
            next.invokewriteandflush(m, promise);
        } else {
            next.invokewrite(m, promise);
        }
    } 
}

可以看到,最终,通过一个boolean变量,表示是调用 invokewriteandflush,还是 invokewriteinvokewrite便是我们上文中的write过程

private void invokewriteandflush(object msg, channelpromise promise) {
    invokewrite0(msg, promise);
    invokeflush0();
}

可以看到,最终调用的底层方法和单独调用 write 和 flush 是一样的

private void invokewrite(object msg, channelpromise promise) {
        invokewrite0(msg, promise);
}

private void invokeflush(object msg, channelpromise promise) {
        invokeflush0(msg, promise);
}

由此看来,invokewriteandflush基本等价于write方法之后再来一次flush

总结

1.pipeline中的编码器原理是创建一个bytebuf,将java对象转换为bytebuf,然后再把bytebuf继续向前传递

2.调用write方法并没有将数据写到socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出

3.writeandflush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功

4.netty中的缓冲区中的bytebuf为directbytebuf

 

 

 

《Netty源码分析 (八)----- write过程 源码分析.doc》

下载本文的Word格式文档,以方便收藏与打印。