学习Netty的相关知识,使用Netty实现了一个简单的群聊系统,使用Protobuf进行传输,支持空闲心跳检测,并且可以自定义群聊名称,所以写了此篇文章,做个知识记录。
代码结构
代码
代码中我添加了详细的注释,所以接下来我就不进行解释了,直接贴代码:
1:客户端
/**
* @Author: wlc
* @Date: 2020/7/19 17:51
* @Description: Netty 群聊客户端
**/
public class GroupChatClient {
// 客户端属性
private final String host;
private final int port;
private String userName = "默认用户" + (int)(Math.random() * 100);
public GroupChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 获取pipeline
ChannelPipeline pipeline = sc.pipeline();
// 向pipeline中加入Protobuf解码器
pipeline.addLast("decoder", new ProtobufDecoder(MsgInfo.Msg.getDefaultInstance()));
// 向pipeline中加入Protobuf编码器
pipeline.addLast("encoder", new ProtobufEncoder());
// 加入自定义处理器handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture sync = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = sync.channel();
System.out.println(channel.localAddress() + "欢迎进入聊天室");
System.out.println("请先设置聊天名称:(命令:name=XXX)");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
if (msg.startsWith("name=")){
userName = msg.substring(5);
} else {
MsgInfo.Msg msgInfo = MsgInfo.Msg.newBuilder().setUserName(userName).setMsg(msg).build();
// 通过channel 发送到服务器端
channel.writeAndFlush(msgInfo);
}
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 6666).run();
}
}
2:客户端处理器
/**
* @Author: wlc
* @Date: 2020/7/19 18:24
* @Description: Netty 群聊客户端处理器
**/
public class GroupChatClientHandler extends SimpleChannelInboundHandler<MsgInfo.Msg>{
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MsgInfo.Msg msg) throws Exception {
System.out.println(msg.getUserName() + "发送消息:");
System.out.println(msg.getMsg());
}
}
3:服务端
/**
* @Author: wlc
* @Date: 2020/7/19 10:47
* @Description: Netty 群聊服务端
**/
public class GroupChatServer {
private int port;
public GroupChatServer(int port){
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();// 默认为几个 Runtime.getRuntime().availableProcessors()*2
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();// 启动辅助类,通过他可以很方便的创建一个Netty服务端
serverBootstrap.group(bossGroup, workGroup)// 设置bossGroup和workGroup
.channel(NioServerSocketChannel.class)// 指定使用NioServerSocketChannel来处理连接请求
.handler(new LoggingHandler(LogLevel.INFO))// 给bossGroup提供一个日志处理器
.option(ChannelOption.SO_BACKLOG, 128) // 临时存放已完成三次握手的请求的队列的最大长度,超过后新连接将会被TCP内核拒绝。默认54
.childOption(ChannelOption.SO_KEEPALIVE, true)// 是否启用心跳保活机制。在双方TCP套接字建立连接后(即都进入ESTABLISHED状态)并且在两个小时左右上层没有任何数据传输的情况下,这套机制才会被激活。
.childHandler(new ChannelInitializer<SocketChannel>() {// 配置入站、出站事件handler
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 获取pipeline
ChannelPipeline pipeline = sc.pipeline();
// 向pipeline中加入Protobuf解码器
pipeline.addLast("decoder", new ProtobufDecoder(MsgInfo.Msg.getDefaultInstance()));
// 向pipeline中加入Protobuf编码器
pipeline.addLast("encoder", new ProtobufEncoder());
// 加入自定义处理器handler
pipeline.addLast(new GroupChatServerHandler());
/* netty提供的空闲状态的处理器
1:多长时间没有读,就会发送一个心跳检测是否连接
2:多长时间没有写,就会发送一个心跳检测是否连接
3:多长时间有没读写,就会发送一个心跳检测是否连接
4:时间单位
5:当被触发后,就会传递给管道的下一个handler去处理
*/
pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.MINUTES));
// 加入针对空闲处理的handler
pipeline.addLast(new GroupChatHeartHandler());
}
});
System.out.println("Netty 服务已启动");
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();// ServerBootstrap.bind → AbstractBootstrap.bind → AbstractBootstrap.doBind
// 监听关闭
channelFuture.channel().closeFuture().sync();// 开启了一个子线程server channel的监听器,负责监听channel是否关闭的状态
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(6666).run();
}
}
4:服务端处理器
/**
* @Author: wlc
* @Date: 2020/7/19 17:09
* @Description: Netty 群聊服务端处理器
**/
public class GroupChatServerHandler extends SimpleChannelInboundHandler<MsgInfo.Msg>{
// 定义一个channle组,管理所有的channel
// GlobalEventExecutor.INSTANCE 单例的全局事件执行器 帮助执行channelGroup
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 当链接建立,第一个被执行的方法
// 将当前channel 加入到channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 获取channel
Channel channel = ctx.channel();
// 上线信息推送给其他客户端
// 该方法会将channelGroup中所有的channel遍历,并发送
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "加入聊天\n");
// 加入到channelGroup
channelGroup.add(channel);
}
// 标识channel 处于活动状态,提示xx上线
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "上线!");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "离线!");
}
// 断开链接后会被触发
// 执行过后,channelGroup会自动将相应的channel移除,不需要手动移除
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + "已离开\n");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MsgInfo.Msg msg) throws Exception {
// 获取当前channel
Channel channel = channelHandlerContext.channel();
System.out.println("[服务器]接受" + msg.getUserName() + "发送的消息:" + msg.getMsg());
// 遍历channelGroup,发送消息
channelGroup.forEach(ch -> {
if (ch != channel){
ch.writeAndFlush(msg);
} else {
ch.writeAndFlush(msg);
}
});
}
// 异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
5:空闲检测处理器
/**
* @Author: wlc
* @Date: 2020/7/20 21:08
* @Description: 群聊心跳处理
**/
public class GroupChatHeartHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
// 将evt向下转型为IdleStateEvent
IdleStateEvent event = (IdleStateEvent)evt;
String eventType = null;
switch (event.state()){
case READER_IDLE: eventType = "读空闲";
break;
case WRITER_IDLE: eventType = "写空闲";
break;
case ALL_IDLE: eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "发生:" + eventType + "事件,需要处理");
}
}
}
6:消息体
由于消息体我用的是proto,所以我就只贴出来转换前的代码,大家可以看一下,转化后的代码太长了,就不贴了。
syntax = "proto3"; // 指定使用 proto3 语法
option java_outer_classname = "MsgInfo"; // 指定类名
message Msg {
string userName = 1;
string msg = 2;
}
本文地址:https://blog.csdn.net/wanglc7/article/details/107898338