7. 彤哥说netty系列之Java NIO核心组件之Selector

2022-10-12,,,,

——日拱一卒,不期而至!

你好,我是彤哥,本篇是netty系列的第七篇。

简介

上一章我们一起学习了java nio的核心组件buffer,它通常跟channel一起使用,但是它们在网络io中又该如何使用呢,今天我们将一起学习另一个nio核心组件——selector,没有它可以说就干不起来网络io。

概念

我们先来看两段selector的注释,见类java.nio.channels.selector

注释i

a multiplexor of {@link selectablechannel} objects.

它是selectablechannel对象的多路复用器,从这里我们也可以知道java nio实际上是多路复用io。

selectablechannel有几个子类,你会非常熟悉:

  • datagramchannel,udp协议连接
  • socketchannel,tcp协议连接
  • serversocketchannel,专门处理tcp协议accept事件

我们有必要复习一下多路复用io的流程

第一阶段通过select去轮询检查有没有连接准备好数据,第二阶段把数据从内核空间拷贝到用户空间。

在java中,就是通过selector这个多路复用器来实现第一阶段的。

注释ii

a selector may be created by invoking the {@link #open open} method of this class, which will use the system's default {@link java.nio.channels.spi.selectorprovider selector provider} to create a new selector. a selector may also be created by invoking the {@link java.nio.channels.spi.selectorprovider#openselector openselector} method of a custom selector provider. a selector remains open until it is closed via its {@link #close close} method.

selector可以通过它自己的open()方法创建,它将通过默认的java.nio.channels.spi.selectorprovider类创建一个新的selector。也可以通过实现java.nio.channels.spi.selectorprovider类的抽象方法openselector()来自定义实现一个selector。selector一旦创建将会一直处于open状态直到调用了close()方法为止。

那么,默认使用的selector究竟是哪个呢?

通过跟踪源码:

> java.nio.channels.selector#open()
  1> java.nio.channels.spi.selectorprovider#provider()
    1.1> sun.nio.ch.defaultselectorprovider#create() // 返回windowsselectorprovider
  2> sun.nio.ch.windowsselectorprovider#openselector() // 返回windowsselectorimpl

可以看到,在windows平台下,默认实现的provider是windowsselectorprovider,它的openselector()方法返回的是windowsselectorimpl,它就是windows平台默认的selector实现。

为什么要提到在windows平台呢,难道在linux下面实现不一样?

是滴,因为网络io是跟操作系统息息相关的,不同的操作系统的实现可能都不一样,linux下面jdk的实现完全不一样,那么我们为什么没有感知到呢?我的代码在windows下面写的,拿到linux下面不是一样运行?那是java虚拟机(或者说java运行时环境)帮我们把这个事干了,它屏蔽了跟操作系统相关的细节,这也是java代码可以“write once, run anywhere”的精髓所在。

selector与channel的关系

上面我们说了selector是多路复用器,它是在网络io的第一阶段用来轮询检查有没有连接准备好数据的,那么它和channel是什么关系呢?

selector通过不断轮询的方式同时监听多个channel的事件,注意,这里是同时监听,一旦有channel准备好了,它就会返回这些准备好了的channel,交给处理线程去处理。

所以,在nio编程中,通过selector我们就实现了一个线程同时处理多个连接请求的目标,也可以一定程序降低服务器资源的消耗。

基本用法

创建selector

通过调用selector.open()方法是我们常用的方式:

selector selector = selector.open();

当然,也可以通过实现java.nio.channels.spi.selectorprovider.openselector()抽象方法自定义一个selector。

将channel注册到selector上

为了将channel跟selector绑定在一起,需要将channel注册到selector上,调用channel的register()方法即可:

channel.configureblocking(false);

selectionkey key = channel.register(selector, selectionkey.op_read);

channel必须是非阻塞模式才能注册到selector上,所以,无法将一个filechannel注册到selector,因为filechannel没有所谓的阻塞还是非阻塞模式,本文来源于工从号彤哥读源码。

注册的时候第二个参数传入的是监听的事件,一共有四种事件:

  • connect
  • accept
  • read
  • write

当channel触发了某个事件,通常也叫作那个事件就绪了。比如,数据准备好可以读取了就叫作读就绪了,同样地,还有写就绪、连接就绪、接受就绪,当然后面两个不常听到。

在java中,这四种监听事件是定义在selectionkey中的:

  • selectionkey.op_read,值为 1 << 0 = 0000 0001
  • selectionkey.op_write,值 为 1 << 2 = 0000 0100
  • selectionkey.op_connect,值为 1 << 3 = 0000 1000
  • selectionkey.op_accept,值为 1 << 4 = 0001 0000

所以,也可以通过位或命令监听多个感兴趣的事件:

int interestset = selectionkey.op_read | selectionkey.op_write;

selectionkey

正如上面所看到的,channel注册到selector后返回的是一个selectionkey,所以selectionkey又可以看作是channel和selector之间的一座桥梁,把两者绑定在了一起。

selectionkey具有以下几个重要属性:

  • interest set,感兴趣的事件集
  • ready set,就绪的事件集
  • 保存着的channel
  • 保存着的selector
  • attached object,附件

interest set

里面保存了注册channel到selector时传入的第二个参数,即感兴趣的事件集。

int interestset = selectionkey.interestops();

boolean isinterestedinaccept  = interestset & selectionkey.op_accept;
boolean isinterestedinconnect = interestset & selectionkey.op_connect;
boolean isinterestedinread    = interestset & selectionkey.op_read;
boolean isinterestedinwrite   = interestset & selectionkey.op_write;    

可以通过位与运算查看是否注册了相应的事件。

ready set

里面保存了就绪了的事件集。

int readyset = selectionkey.readyops();
selectionkey.isacceptable();
selectionkey.isconnectable();
selectionkey.isreadable();
selectionkey.iswritable();

可以通过readyops()方法获取所有就绪了的事件,也可以通过isxxxable()方法检查某个事件是否就绪。

保存的channel和selector

channel  channel  = selectionkey.channel();

selector selector = selectionkey.selector();    

通过channel()selector()方法可以获取绑定的channel和selector。

attachment

可以调用attach(obj)方法绑定一个对象到selectionkey上,并在后面需要用到的时候通过attachment()方法取出绑定的对象,也可以翻译为附件,它可以看作是数据传递的一种媒介,跟threadlocal有点类似,在前面绑定数据,在后面使用。

selectionkey.attach(theobject);

object attachedobj = selectionkey.attachment();

当然,也可以在注册channel到selector的时候就绑定附件:

selectionkey key = channel.register(selector, selectionkey.op_read, theobject);

selector.select()

一旦将一个或多个channel注册到selector上了,我们就可以调用它的select()方法了,它会返回注册时感兴趣的事件中就绪的事件,本文来源于工从号彤哥读源码。

select()方法有三种变体:

  • select(),无参数,阻塞直到某个channel有就绪的事件了才返回(当然是我们注册的感兴趣的事件)
  • select(timeout),带超时,阻塞直到某个channel有就绪的事件了,或者超时了才返回
  • selectnow(),立即返回,不会阻塞,不管有没有就绪的channel都立即返回

select()的返回值为int类型,表示两次select()之间就绪的channel,即使上一次调用select()时返回的就绪channel没有被处理,下一次调用select()也不会再返回上一次就绪的channel。比如,第一次调用select()返回了一个就绪的channel,但是没有处理它,第二次调用select()时又有一个channel就绪了,那也只会返回1,而不是2。

selector.selectedkeys()

一旦调用select()方法返回了有就绪的channel,我们就可以使用selectedkeys()方法来获取就绪的channel了。

set<selectionkey> selectedkeys = selector.selectedkeys();    

然后,就可以遍历这些selectionkey来查看感兴趣的事件是否就绪了:

set<selectionkey> selectedkeys = selector.selectedkeys();

iterator<selectionkey> keyiterator = selectedkeys.iterator();

while(keyiterator.hasnext()) {
    
    selectionkey key = keyiterator.next();

    if(key.isacceptable()) {
        // a connection was accepted by a serversocketchannel.

    } else if (key.isconnectable()) {
        // a connection was established with a remote server.

    } else if (key.isreadable()) {
        // a channel is ready for reading

    } else if (key.iswritable()) {
        // a channel is ready for writing
    }

    keyiterator.remove();
}

最后,一定要记得调用keyiterator.remove();移除已经处理的selectionkey。

selector.wakeup()

前面我们说了调用select()方法时,调用者线程会进入阻塞状态,直到有就绪的channel才会返回。其实也不一定,wakeup()就是用来破坏规则的,可以在另外一个线程调用wakeup()方法强行唤醒这个阻塞的线程,这样select()方法也会立即返回。

如果调用wakeup()时并没有线程阻塞在select()上,那么,下一次调用select()将立即返回,不会进入阻塞状态。这跟locksupport.unpark()方法是比较类似的。

selector.close()

调用close()方法将会关闭selector,同时也会将关联的selectionkey失效,但不会关闭channel。

举个栗子

public class echoserver {
    public static void main(string[] args) throws ioexception {
        // 创建一个selector
        selector selector = selector.open();
        // 创建serversocketchannel
        serversocketchannel serversocketchannel = serversocketchannel.open();
        // 绑定8080端口
        serversocketchannel.bind(new inetsocketaddress(8080));
        // 设置为非阻塞模式,本文来源于工从号彤哥读源码
        serversocketchannel.configureblocking(false);
        // 将channel注册到selector上,并注册accept事件
        serversocketchannel.register(selector, selectionkey.op_accept);

        while (true) {
            // 阻塞在select上
            selector.select();

            // 如果使用的是select(timeout)或selectnow()需要判断返回值是否大于0

            // 有就绪的channel
            set<selectionkey> selectionkeys = selector.selectedkeys();
            // 遍历selectkeys
            iterator<selectionkey> iterator = selectionkeys.iterator();
            while (iterator.hasnext()) {
                selectionkey selectionkey = iterator.next();
                // 如果是accept事件
                if (selectionkey.isacceptable()) {
                    // 强制转换为serversocketchannel
                    serversocketchannel ssc = (serversocketchannel) selectionkey.channel();
                    socketchannel socketchannel = ssc.accept();
                    system.out.println("accept new conn: " + socketchannel.getremoteaddress());
                    socketchannel.configureblocking(false);
                    // 将socketchannel注册到selector上,并注册读事件
                    socketchannel.register(selector, selectionkey.op_read);
                } else if (selectionkey.isreadable()) {
                    // 如果是读取事件
                    // 强制转换为socketchannel
                    socketchannel socketchannel = (socketchannel) selectionkey.channel();
                    // 创建buffer用于读取数据
                    bytebuffer buffer = bytebuffer.allocate(1024);
                    // 将数据读入到buffer中
                    int length = socketchannel.read(buffer);
                    if (length > 0) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        // 将数据读入到byte数组中
                        buffer.get(bytes);

                        // 换行符会跟着消息一起传过来
                        string content = new string(bytes, "utf-8").replace("\r\n", "");
                        if (content.equalsignorecase("quit")) {
                            selectionkey.cancel();
                            socketchannel.close();
                        } else {
                            system.out.println("receive msg: " + content);
                        }
                    }
                }
                iterator.remove();
            }
        }
    }
}

总结

今天我们学习了java nio核心组件selector,到这里,nio的三个最重要的核心组件我们就学习完毕了,说实话,nio这块最重要的还是思维的问题,时刻记着在nio中一个线程是可以处理多个连接的。

看着java原生nio实现网络编程似乎也没什么困难的吗?那么为什么还要有netty呢?下一章我们将正式进入netty的学习之中,我们将在其中寻找答案。

最后,也欢迎来我的工从号彤哥读源码系统地学习源码&架构的知识。

《7. 彤哥说netty系列之Java NIO核心组件之Selector.doc》

下载本文的Word格式文档,以方便收藏与打印。