Netty学习笔记之用NIO实现一个echo服务器

2022-07-28,,,,

文章目录

  • 需求分析
  • 代码实战
  • 增加需求
  • 代码改进

需求分析

了解了NIO以及其组件,下面我要用NIO编程知识来实现一个echo服务器

所谓echo服务器,及客户端像给服务器发送了什么消息,服务器就发回什么消息。

下面我们来尝试实现这个服务器。

代码实战

话不多说,先直接上代码。

public class MainDemo1 {
    //处理拿到可读事件的socket
    static class clientProcessor implements Runnable{
        private Selector selector;
        public clientProcessor(Selector selector){
            this.selector = selector;
        }
        @Override
        public void run() {
            while (true){
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        if (key.isValid() == false)
                        {
                            continue;
                        }
                        if (key.isReadable())
                        {//代码①
                           ByteBuffer byteBuffer = ByteBuffer.allocate(16);
                           SocketChannel clientChannel  = (SocketChannel) key.channel();
                           int read = clientChannel.read(byteBuffer);
                           if (read == -1){
                               key.cancel();
                               clientChannel.close();
                           }else {
                               byteBuffer.flip();
                               byte[] bytes = new byte[byteBuffer.limit()];
                               byteBuffer.get(bytes);
                               System.out.println(bytes);
                           }
                           iterator.remove();
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws IOException {
        //新建服务端管道对象并设置为非阻塞
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(9999));
        serverSocketChannel.configureBlocking(false);
        //基于机器性能创建选择器数组
        final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
        //初始化数组,依次启动线程
        for (int i = 0; i < selectors.length; i++) {
            final Selector selector = Selector.open();
            selectors[i] = selector;
            new Thread(new clientProcessor(selector)).start();
        }
        AtomicInteger id = new AtomicInteger();
        Selector selector = Selector.open();
        serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
        while (true){
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()){
                iterator.next();
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.register(selectors[id.getAndIncrement()%selectors.length], SelectionKey.OP_READ);
                iterator.remove();
            }
        }
    }
}


如上代码,已经可以初步实现我们所需要的服务器,分析一下代码。

先创建一个服务端管道,服务端管道注册选择器的接入事件,然后循环select等待客户端的接入。

客户端接入后,在自己的线程内,selector返回调用后产生的选择键集合,之后遍历集合,判断事件,当是接入事件时,拿到客户端通道,再将客户端通道注册为可读事件到选择器上,再次select,等待数据的准备。

当数据准备好后,selector再次返回调用后产生的选择键集合,遍历集合,这次是可读事件,判断为可读事件后,读取客户端通道数据,并回写给客户端。

这样我们就实现了一个简单的echo服务器,但是我们现在需要给这个服务器加点功能,让它能更好的实现对客户端信息的回写。

增加需求

我们规定我们的echo服务器的实现需要有以下特点:

  1. 服务器原样返回客户端发送的信息。
  2. 客户端发送的信息以’\r’作为一个消息的结尾,一个消息的最大长度不超过128。
  3. 客户端会一次发送多个消息,服务端需按顺序原样返回。

我们先来分析一下需求。

针对需求第二三,客户端发送的消息需要以“/r”作为消息的结尾。所以为了把服务器的消息区分开来,我们不能以定长来处理数据了。同时我们需要考虑一下tcp的拆包和粘包。

什么是tcp的拆包和粘包呢?tcp是面向流的协议,我们无法知道一个数据包的边界,所以在接收数据时,可能会因为一次数据包过大而分次填充到socket缓冲区,这就是tcp的拆包;而多个数据包一起读取并填充到socket的缓冲区中,便称为tcp的粘包。

有了以上分析,我们可以得到以下代码:

public class MainDemo2 {
    //处理拿到可读事件的socket
    static class clientProcessor implements Runnable {
        private Selector selector;
        public clientProcessor(Selector selector) {
            this.selector = selector;
        }
        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isValid() == false) {
                            continue;
                        }
                        if (key.isReadable()) {
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            ByteBuffer readBuffer = (ByteBuffer) key.attachment();
                            int read = clientChannel.read(readBuffer);
                            if (read == -1) {
                                key.cancel();
                                clientChannel.close();
                            } else {
                                readBuffer.flip();
                                int position = readBuffer.position();
                                int limit = readBuffer.limit();
                                List<ByteBuffer> buffers = new ArrayList<>();
                                for (int i = position; i < limit; i++) {
                                    if (readBuffer.get(i) == '\r') ;
                                    {
                                        ByteBuffer message = ByteBuffer.allocate(i - readBuffer.position() + 1);
                                        readBuffer.limit(i + 1);
                                        message.put(readBuffer);
                                        readBuffer.limit(limit);
                                        message.flip();
                                        buffers.add(message);
//                                        byte[] bytes = new byte[message.limit()];
//                                        message.get(bytes);
//                                        System.out.println(bytes);
                                    }
                                }
                                for (ByteBuffer message : buffers) {
                                //判断message是否有效
                                    while (message.hasRemaining()) {
                                        clientChannel.write(message);
                                    }
                                }
                                readBuffer.compact();
                            }

                        }
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String[] args) throws IOException {
        //新建服务端管道对象并设置为非阻塞
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //基于机器性能创建选择器数组
        final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
        //初始化数组,依次启动线程
        for (int i = 0; i < selectors.length; i++) {
            final Selector selector = Selector.open();
            selectors[i] = selector;
            new Thread(new clientProcessor(selector)).start();
        }
        AtomicInteger id = new AtomicInteger();
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                iterator.next();
                SocketChannel socketChannel = serverSocketChannel.accept();
                Selector selectorChild = selectors[id.getAndIncrement() % selectors.length];
                socketChannel.register(selectorChild, SelectionKey.OP_READ, ByteBuffer.allocate(256));
                selectorChild.wakeup();
                iterator.remove();
            }
        }
    }
}

如上代码,我们比之前的实现做了一些改动。

首先在注册可读事件到选择器时,我们带上了一个256长度的ByteBuffer。这个ByteBuffer专门服务于这个这个选择器。因为粘包和拆包的存在,一次读取可能的数据中可能有多个消息,也可能不足一个消息,所以我们选择了用一个ByteBuffer去累计,这样每次读取也会考虑到上一次读取剩下的数据。

同时我们在实现消息读取的时候,不再是定长读取,而是循环检查字节,判断消息是否结尾。每得到一个消息就存入集合中,最后遍历集合依次返回客户端。

上面的代码已经实现了我们的需求,但是却是有缺点的。客户端在写出消息时,如果客户端缓冲区已满,消息无法写出,程序会一直循环等待,很消耗性能。下面我们再来改进一下代码。

代码改进

按照上述改进思路,我们回写消息的时候,再给消息注册写入事件,优化后的代码如下:

public class MainDemo3 {
    static class ChannelBuffer {
        ByteBuffer readBuffer;
        ByteBuffer[] writeBuffers;
        List<ByteBuffer> list = new LinkedList<>();
    }

    //处理拿到可读事件的socket
    static class clientProcessor implements Runnable {
        private Selector selector;

        public clientProcessor(Selector selector) {
            this.selector = selector;
        }

        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isValid() == false) {
                            continue;
                        }
                        if (key.isReadable()) {
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            ChannelBuffer channelBuffer = (ChannelBuffer) key.attachment();
                            ByteBuffer readBuffer = channelBuffer.readBuffer;
                            int read = clientChannel.read(readBuffer);
                            if (read == -1) {
                                key.cancel();
                                clientChannel.close();
                            } else {
                                readBuffer.flip();
                                int position = readBuffer.position();
                                int limit = readBuffer.limit();
                                List<ByteBuffer> buffers = new ArrayList<>();
                                for (int i = position; i < limit; i++) {
                                    if (readBuffer.get(i) == '\r') ;
                                    {
                                        ByteBuffer message = ByteBuffer.allocate(i - readBuffer.position() + 1);
                                        readBuffer.limit(i + 1);
                                        message.put(readBuffer);
                                        readBuffer.limit(limit);
                                        message.flip();
                                        buffers.add(message);
//                                        byte[] bytes = new byte[message.limit()];
//                                        message.get(bytes);
//                                        System.out.println(bytes);
                                    }
                                }
                                if (channelBuffer.writeBuffers == null) {
                                    ByteBuffer[] byteBuffers = buffers.toArray(new ByteBuffer[buffers.size()]);
                                    clientChannel.write(byteBuffers);
                                    boolean hasRemaining = hasRemaining(byteBuffers);
                                    if (hasRemaining) {
                                        channelBuffer.writeBuffers = byteBuffers;
                                        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

                                    }
                                } else {
                                    //还有尚未发送完全的数据,新产生的数据需要放入队列
                                    channelBuffer.list.addAll(buffers);
                                }
                                readBuffer.compact();
                            }
                        }
                        if (key.isWritable()){
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            ChannelBuffer channelBuffer = (ChannelBuffer) key.attachment();
                            ByteBuffer[]  writeBuffers  = channelBuffer.writeBuffers;
                            clientChannel.write(writeBuffers);
                            boolean hasRemaining = hasRemaining(writeBuffers);
                            if (hasRemaining==false){
                                channelBuffer.writeBuffers = null;
                                List<ByteBuffer> list = channelBuffer.list;
                                if (!list.isEmpty()){
                                    writeBuffers = list.toArray(new ByteBuffer[list.size()]);
                                    list.clear();
                                    clientChannel.write(writeBuffers);
                                    if (hasRemaining(writeBuffers))
                                    {
                                        //仍然有数据没有完全写出,保留对可写事件的关注
                                    }
                                    else
                                    {
                                        //没有数据要写出了,取消对可写事件的关注
                                        key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                                    }
                                }
                                else
                                {
                                    //没有数据要写出了,取消对可写事件的关注
                                    key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
                                }
                            }
                        }
                        iterator.remove();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private boolean hasRemaining(ByteBuffer[] byteBuffers) {
            boolean hasRemaining = false;
            for (ByteBuffer byteBuffer : byteBuffers) {
                if (byteBuffer.hasRemaining()) {
                    hasRemaining = true;
                    break;
                }
            }
            return hasRemaining;
        }
    }

    public static void main(String[] args) throws IOException {
        //新建服务端管道对象并设置为非阻塞
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(9999));
        //基于机器性能创建选择器数组
        final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
        //初始化数组,依次启动线程
        for (int i = 0; i < selectors.length; i++) {
            final Selector selector = Selector.open();
            selectors[i] = selector;
            new Thread(new clientProcessor(selector)).start();
        }
        AtomicInteger id = new AtomicInteger();
        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                iterator.next();
                SocketChannel socketChannel = serverSocketChannel.accept();
                Selector selectorChild = selectors[id.getAndIncrement() % selectors.length];
                ChannelBuffer channelBuffer = new ChannelBuffer();
                socketChannel.register(selectorChild, SelectionKey.OP_READ, channelBuffer);
                selectorChild.wakeup();
                iterator.remove();
            }
        }
    }
}

如上代码,如果在回写消息的时候,如果一次没有写出,不再循环判断等待,而是注册写入事件到选择器。我们用一个list来维护消息回写的有序性,如果上次的数据还没回写完成,则把此次数据添加的list中等待下一次写入。

在判断为写入事件的代码中,我们先对消息进行一次回写,如果回写消息的writeBuffers已经回写完,则开始回写list中的消息,否则结束事件,等待下一次写入。

本文地址:https://blog.csdn.net/sen_sen97/article/details/109587985

《Netty学习笔记之用NIO实现一个echo服务器.doc》

下载本文的Word格式文档,以方便收藏与打印。