Netty:实现群聊系统+自定义名称+空闲检测+Protobuf

2022-07-31,,,,

学习Netty的相关知识,使用Netty实现了一个简单的群聊系统,使用Protobuf进行传输,支持空闲心跳检测,并且可以自定义群聊名称,所以写了此篇文章,做个知识记录。

代码结构

代码

代码中我添加了详细的注释,所以接下来我就不进行解释了,直接贴代码:

1:客户端

/**
 * @Author: wlc
 * @Date: 2020/7/19 17:51
 * @Description: Netty 群聊客户端
 **/
public class GroupChatClient {

    // 客户端属性
    private final String host;
    private final int port;
    private String userName = "默认用户" + (int)(Math.random() * 100);

    public GroupChatClient(String host, int port){
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            // 获取pipeline
                            ChannelPipeline pipeline = sc.pipeline();
                            // 向pipeline中加入Protobuf解码器
                            pipeline.addLast("decoder", new ProtobufDecoder(MsgInfo.Msg.getDefaultInstance()));
                            // 向pipeline中加入Protobuf编码器
                            pipeline.addLast("encoder", new ProtobufEncoder());
                            // 加入自定义处理器handler
                            pipeline.addLast(new GroupChatClientHandler());

                        }
                    });
            ChannelFuture sync = bootstrap.connect(host, port).sync();
            //得到channel
            Channel channel = sync.channel();

            System.out.println(channel.localAddress() + "欢迎进入聊天室");
            System.out.println("请先设置聊天名称:(命令:name=XXX)");

            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNext()){
                String msg = scanner.nextLine();
                if (msg.startsWith("name=")){
                    userName = msg.substring(5);
                } else {
                    MsgInfo.Msg msgInfo = MsgInfo.Msg.newBuilder().setUserName(userName).setMsg(msg).build();
                    // 通过channel 发送到服务器端
                    channel.writeAndFlush(msgInfo);
                }
            }
        } finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {

        new GroupChatClient("127.0.0.1", 6666).run();
    }

}

2:客户端处理器

/**
 * @Author: wlc
 * @Date: 2020/7/19 18:24
 * @Description: Netty 群聊客户端处理器
 **/
public class GroupChatClientHandler extends SimpleChannelInboundHandler<MsgInfo.Msg>{
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MsgInfo.Msg msg) throws Exception {
        System.out.println(msg.getUserName() + "发送消息:");
        System.out.println(msg.getMsg());
    }
}

3:服务端

/**
 * @Author: wlc
 * @Date: 2020/7/19 10:47
 * @Description: Netty 群聊服务端
 **/
public class GroupChatServer {

    private int port;

    public GroupChatServer(int port){
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup();// 默认为几个 Runtime.getRuntime().availableProcessors()*2

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();// 启动辅助类,通过他可以很方便的创建一个Netty服务端
            serverBootstrap.group(bossGroup, workGroup)// 设置bossGroup和workGroup
                    .channel(NioServerSocketChannel.class)// 指定使用NioServerSocketChannel来处理连接请求
                    .handler(new LoggingHandler(LogLevel.INFO))// 给bossGroup提供一个日志处理器
                    .option(ChannelOption.SO_BACKLOG, 128) // 临时存放已完成三次握手的请求的队列的最大长度,超过后新连接将会被TCP内核拒绝。默认54
                    .childOption(ChannelOption.SO_KEEPALIVE, true)// 是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。
                    .childHandler(new ChannelInitializer<SocketChannel>() {// 配置入站、出站事件handler
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {

                            // 获取pipeline
                            ChannelPipeline pipeline = sc.pipeline();
                            // 向pipeline中加入Protobuf解码器
                            pipeline.addLast("decoder", new ProtobufDecoder(MsgInfo.Msg.getDefaultInstance()));
                            // 向pipeline中加入Protobuf编码器
                            pipeline.addLast("encoder", new ProtobufEncoder());
                            // 加入自定义处理器handler
                            pipeline.addLast(new GroupChatServerHandler());

                            /* netty提供的空闲状态的处理器
                               1:多长时间没有读,就会发送一个心跳检测是否连接
                               2:多长时间没有写,就会发送一个心跳检测是否连接
                               3:多长时间有没读写,就会发送一个心跳检测是否连接
                               4:时间单位
                               5:当被触发后,就会传递给管道的下一个handler去处理
                            */
                            pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.MINUTES));
                            // 加入针对空闲处理的handler
                            pipeline.addLast(new GroupChatHeartHandler());
                        }
                    });
            System.out.println("Netty 服务已启动");
            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();// ServerBootstrap.bind → AbstractBootstrap.bind → AbstractBootstrap.doBind
            // 监听关闭
            channelFuture.channel().closeFuture().sync();// 开启了一个子线程server channel的监听器,负责监听channel是否关闭的状态
        } finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new GroupChatServer(6666).run();
    }
}

4:服务端处理器

/**
 * @Author: wlc
 * @Date: 2020/7/19 17:09
 * @Description: Netty 群聊服务端处理器
 **/
public class GroupChatServerHandler extends SimpleChannelInboundHandler<MsgInfo.Msg>{

    // 定义一个channle组,管理所有的channel
    // GlobalEventExecutor.INSTANCE 单例的全局事件执行器 帮助执行channelGroup
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    // 当链接建立,第一个被执行的方法
    // 将当前channel 加入到channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 获取channel
        Channel channel = ctx.channel();
        // 上线信息推送给其他客户端
        // 该方法会将channelGroup中所有的channel遍历,并发送
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "加入聊天\n");
        // 加入到channelGroup
        channelGroup.add(channel);
    }

    // 标识channel 处于活动状态,提示xx上线
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "上线!");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress() + "离线!");
    }

    // 断开链接后会被触发
    // 执行过后,channelGroup会自动将相应的channel移除,不需要手动移除
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "已离开\n");
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, MsgInfo.Msg msg) throws Exception {
        // 获取当前channel
        Channel channel = channelHandlerContext.channel();
        System.out.println("[服务器]接受" + msg.getUserName() + "发送的消息:" + msg.getMsg());
        // 遍历channelGroup,发送消息
        channelGroup.forEach(ch -> {
            if (ch != channel){
                ch.writeAndFlush(msg);
            } else {
                ch.writeAndFlush(msg);
            }
        });
    }

    // 异常处理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

5:空闲检测处理器

/**
 * @Author: wlc
 * @Date: 2020/7/20 21:08
 * @Description: 群聊心跳处理
 **/
public class GroupChatHeartHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            // 将evt向下转型为IdleStateEvent
            IdleStateEvent event = (IdleStateEvent)evt;
            String eventType = null;
            switch (event.state()){
                case READER_IDLE: eventType = "读空闲";
                break;
                case WRITER_IDLE: eventType = "写空闲";
                break;
                case ALL_IDLE: eventType = "读写空闲";
                break;
            }
            System.out.println(ctx.channel().remoteAddress() + "发生:" + eventType + "事件,需要处理");

        }

    }
}

6:消息体

由于消息体我用的是proto,所以我就只贴出来转换前的代码,大家可以看一下,转化后的代码太长了,就不贴了。

syntax = "proto3";  // 指定使用 proto3 语法


option java_outer_classname = "MsgInfo"; // 指定类名

message Msg { 
  string userName = 1;
  string msg = 2;
}

本文地址:https://blog.csdn.net/wanglc7/article/details/107898338

《Netty:实现群聊系统+自定义名称+空闲检测+Protobuf.doc》

下载本文的Word格式文档,以方便收藏与打印。