Netty介绍与认识

2023-06-14,

    概述

Netty是由JBOSS提供的一个java开源框架。
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

  2.体系结构图

Netty的核心结构

Netty是典型的Reactor模型结构,在实现上,Netty中的Boss类充当mainReactor,NioWorker类充当subReactor(默认NioWorker的个数是当前服务器的可用核数)。

在处理新来的请求时,NioWorker读完已收到的数据到ChannelBuffer中,之后触发ChannelPipeline中的ChannelHandler流。

Netty是事件驱动的,可以通过ChannelHandler链来控制执行流向。因为ChannelHandler链的执行过程是在subReactor中同步的,所以如果业务处理handler耗时长,将严重影响可支持的并发数。

发一下Netty的架构
 

Server1

public static void main(String[] args) throws Exception {
//启动server服务
new NettyServer().bind(8089);
}

NettyServer

  public void bind(int port) throws Exception {

EventLoopGroup bossGroup = new NioEventLoopGroup(); //bossGroup就是parentGroup,是负责处理TCP/IP连接的
EventLoopGroup workerGroup = new NioEventLoopGroup(); //workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件

ServerBootstrap sb = new ServerBootstrap();
sb.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128) //初始化服务端可连接队列,指定了队列的大小128
.childOption(ChannelOption.SO_KEEPALIVE, true) //保持长连接
.childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel sh) throws Exception {
// sh.pipeline().addLast(new StringEncoder(Charset.forName("gbk")));
// sh.pipeline().addLast(new StringDecoder(Charset.forName("gbk")));
sh.pipeline().addLast("decoder", new MyDecode());
sh.pipeline() .addLast(new ServerHandler()); //使用ServerHandler类来处理接收到的消息
}
});
//绑定监听端口,调用sync同步阻塞方法等待绑定操作完
ChannelFuture future = sb.bind(port).sync();

if (future.isSuccess()) {
System.out.println("服务端启动成功");
} else {
System.out.println("服务端启动失败");
future.cause().printStackTrace();
bossGroup.shutdownGracefully(); //关闭线程组
workerGroup.shutdownGracefully();
}

//成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
future.channel().closeFuture().sync();

}

MyDecode 是我自己编写的一个解析16进制的类,编码器
public class MyDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
byte[] b = new byte[byteBuf.readableBytes()];
//复制内容到字节数组b
byteBuf.readBytes(b);
//字节数组转字符串
String str = new String(b);
list.add(bytesToHexString(b));
}

public String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
String sTemp;
for (int i = 0; i < bArray.length; i++) {
sTemp = Integer.toHexString(0xFF & bArray[i]);
if (sTemp.length() < 2)
sb.append(0);
sb.append(sTemp.toUpperCase());
}
return sb.toString();
}

public static String toHexString1(byte[] b) {
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < b.length; ++i) {
buffer.append(toHexString1(b[i]));
}
return buffer.toString();
}

public static String toHexString1(byte b) {
String s = Integer.toHexString(b & 0xFF);
if (s.length() == 1) {
return "0" + s;
} else {
return s;
}
}

public static byte[] hexString2Bytes(String hex) {
if ((hex == null) || (hex.equals(""))){
return null;
}else if (hex.length()%2 != 0){
return null;
}else{
hex = hex.toUpperCase();
int len = hex.length()/2;
byte[] b = new byte[len];
char[] hc = hex.toCharArray();
for (int i=0; i<len; i++){
int p=2*i;
b[i] = (byte) (charToByte(hc[p]) << 4 | charToByte(hc[p+1]));
}
return b;
}
}

private static byte charToByte(char c) {
return (byte) "0123456789ABCDEF".indexOf(c);
}

public static void main(String[] args) throws Exception {
String fan="烦";
String str = URLEncoder.encode(fan, "utf-8").replaceAll("%", "");
System.err.println(str);

}
}

ServerHandler


@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
try {
  System.out.println(msg.toString);
} catch (Exception e) {
e.printStackTrace();
System.err.println(e.getMessage());
}
}

// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// RpcRequest request = (RpcRequest) msg;
// System.out.println("接收到客户端信息:" + request.toString());
// //返回的数据结构
// RpcResponse response = new RpcResponse();
// response.setId(UUID.randomUUID().toString());
// response.setData("server响应结果");
// response.setStatus(1);
// ctx.writeAndFlush(response);
// }

//通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}

//读操作时捕获到异常时调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}

//客户端去和服务端连接成功时触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

try {
int strLen = 34;
StringBuffer sb = null;
while (classpath.length() < strLen) {
sb = new StringBuffer();
//sb.append("0").append(_16);// 左补0
sb.append(classpath).append("0");//右补0
classpath = sb.toString();
}
System.err.println("sb=" + sb);
String str = "ADBA000160002546323160ACC70014CB142701" + classpath + "FE";
String byte16 = "ADBA000120000B020195FE";
ByteBuf bytebuf = Unpooled.buffer(16);
ByteBuf bytebuf1 = Unpooled.buffer(16);
byte[] bytes = MyDecode.hexString2Bytes(byte16);
byte[] bytes1 = MyDecode.hexString2Bytes(str);
bytebuf1.writeBytes(bytes1);
bytebuf.writeBytes(bytes);
ctx.writeAndFlush(bytebuf);
for (int i = 0; i < 2; i++) {
if (i == 1) {
ctx.writeAndFlush(bytebuf + "\n");
} else {
//ctx.writeAndFlush(bytebuf1 + "\n");
}
}
ctx.flush();
} catch (Exception e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}

  代码就上了,这样16进制和读不到的问题就解决了。

Netty介绍与认识的相关教程结束。

《Netty介绍与认识.doc》

下载本文的Word格式文档,以方便收藏与打印。