Netty解决TCP粘包/拆包问题 - 按行分隔字符串解码器

2022-11-13,,,,

服务端

package org.zln.netty.five.timer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* 时间服务器服务端
* Created by sherry on 16/11/5.
*/
public class TimerServer {
/**
* 服务端绑定端口号
*/
private int PORT; public TimerServer(int PORT) {
this.PORT = PORT;
} /**
* 日志
*/
private static Logger logger = LoggerFactory.getLogger(TimerServer.class); public void bind() {
/*
NioEventLoopGroup是线程池组
包含了一组NIO线程,专门用于网络事件的处理
bossGroup:服务端,接收客户端连接
workGroup:进行SocketChannel的网络读写
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
/*
ServerBootstrap:用于启动NIO服务的辅助类,目的是降低服务端的开发复杂度
*/
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)//配置TCP参数,能够设置很多,这里就只设置了backlog=1024,
.childHandler(new TimerServerInitializer());//绑定I/O事件处理类
logger.debug("绑定端口号:" + PORT + ",等待同步成功");
/*
bind:绑定端口
sync:同步阻塞方法,等待绑定完成,完成后返回 ChannelFuture ,主要用于通知回调
*/
ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
logger.debug("等待服务端监听窗口关闭");
/*
closeFuture().sync():为了阻塞,服务端链路关闭后才退出.也是一个同步阻塞方法
*/
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} finally {
logger.debug("优雅退出,释放线程池资源");
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

TimerServer

 package org.zln.netty.five.timer;

 import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; /**
* Created by sherry on 16/11/5.
*/
public class TimerServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new TimerServerHandler()); }
}

TimerServerInitializer

 package org.zln.netty.five.timer;

 import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat;
import java.util.Date; /**
* Handler主要用于对网络事件进行读写操作,是真正的业务类
* 通常只需要关注 channelRead 和 exceptionCaught 方法
* Created by sherry on 16/11/5.
*/
public class TimerServerHandler extends ChannelHandlerAdapter { /**
* 日志
*/
private Logger logger = LoggerFactory.getLogger(TimerServerHandler.class); private static int count = 0; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg;
logger.debug("第 "+(++count)+" 次收到请求 - "+body); String timeNow = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date())+System.lineSeparator(); //获取发送给客户端的数据
ByteBuf resBuf = Unpooled.copiedBuffer(timeNow.getBytes("UTF-8")); ctx.writeAndFlush(resBuf);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将消息发送队列中的消息写入到SocketChannel中发送给对方
logger.debug("channelReadComplete");
ctx.flush();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//发生异常时,关闭 ChannelHandlerContext,释放ChannelHandlerContext 相关的句柄等资源
logger.error(cause.getMessage(),cause);
ctx.close();
}
}

TimerServerHandler

客户端

 package org.zln.netty.five.timer;

 import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; /**
* 时间服务器客户端
* Created by sherry on 16/11/5.
*/
public class TimerClient {
/**
* 日志
*/
private Logger logger = LoggerFactory.getLogger(TimerServer.class); private String HOST;
private int PORT; public TimerClient(String HOST, int PORT) {
this.HOST = HOST;
this.PORT = PORT;
} public void connect(){
//配置客户端NIO线程组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new TimerClientInitializer());
//发起异步连接操作
logger.debug("发起异步连接操作 - start");
ChannelFuture channelFuture = bootstrap.connect(HOST,PORT).sync();
logger.debug("发起异步连接操作 - end");
//等待客户端链路关闭
logger.debug("等待客户端链路关闭 - start");
channelFuture.channel().closeFuture().sync();
logger.debug("等待客户端链路关闭 - end");
} catch (InterruptedException e) {
logger.error(e.getMessage(),e);
}finally {
//优雅的关闭
eventLoopGroup.shutdownGracefully();
}
}
}

TimerClient

 package org.zln.netty.five.timer;

 import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder; /**
* Created by sherry on 16/11/5.
*/
public class TimerClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(1024));
pipeline.addLast(new StringDecoder());
pipeline.addLast(new TimerClientHandler());
}
}

TimerClientInitializer

 package org.zln.netty.five.timer;

 import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; /**
* Created by sherry on 16/11/5.
*/
public class TimerClientHandler extends ChannelHandlerAdapter { /**
* 日志
*/
private Logger logger = LoggerFactory.getLogger(TimerClientHandler.class); private static int count = 0; @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("客户端连接上了服务端"); //发送请求
ByteBuf reqBuf = null;
for (int i = 0; i < 100; i++) {
reqBuf = getReq("GET TIME"+System.lineSeparator());
ctx.writeAndFlush(reqBuf);
} } /**
* 将字符串包装成ByteBuf
* @param s
* @return
*/
private ByteBuf getReq(String s) throws UnsupportedEncodingException {
byte[] data = s.getBytes("UTF-8");
ByteBuf reqBuf = Unpooled.buffer(data.length);
reqBuf.writeBytes(data);
return reqBuf;
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
logger.debug("这是收到的第 "+(++count)+" 笔响应 -- "+body);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

TimerClientHandler

这里主要使用 LineBasedFrameDecoder 和 StringDecoder 来实现解决粘包问题

原理如下:

  LineBasedFrameDecoder 依次遍历 ByteBuf 中的可读字节,判断是否有 \n 或 \r\n,如果有,就作为结束位置。从可读索引到结束位置区间的字节组成一行。它是以换行符为结束标志的解码器。支持携带结束符或者不懈怠结束符两种解码方式。同时支持配置单行的最大长度。如果读取到了最大长度仍旧没有发现换行符,就会抛出异常,同时忽略掉之前读到的数据。

  StringDecoder 的作用就是讲接收到的对象转化成字符串,然后继续调用handler。这样就不需要再handler中手动将对象转化成字符串了,直接强制转化就行。

  LineBasedFrameDecoder+StringDecoder组合就是按行切割的文本解码器,用来解决TCP的粘包和拆包问题。

  

Netty解决TCP粘包/拆包问题 - 按行分隔字符串解码器的相关教程结束。

《Netty解决TCP粘包/拆包问题 - 按行分隔字符串解码器.doc》

下载本文的Word格式文档,以方便收藏与打印。