添加依赖
<dependency> <groupid>io.netty</groupid> <artifactid>netty-all</artifactid> <version>4.1.2.final</version> </dependency> <dependency> <groupid>org.reflections</groupid> <artifactid>reflections</artifactid> <version>0.9.10</version> </dependency>
组织架构
服务端
封装类信息
public class classinfo implements serializable { private static final long serialversionuid = 1l; private string classname; //类名 private string methodname;//方法名 private class<?>[] types; //参数类型 private object[] objects;//参数列表 public string getclassname() { return classname; } public void setclassname(string classname) { this.classname = classname; } public string getmethodname() { return methodname; } public void setmethodname(string methodname) { this.methodname = methodname; } public class<?>[] gettypes() { return types; } public void settypes(class<?>[] types) { this.types = types; } public object[] getobjects() { return objects; } public void setobjects(object[] objects) { this.objects = objects; } }
服务端网络处理服务器
public class nettyrpcserver { private int port; public nettyrpcserver(int port) { this.port = port; } public void start() { eventloopgroup bossgroup = new nioeventloopgroup(); eventloopgroup workergroup = new nioeventloopgroup(); try { serverbootstrap serverbootstrap = new serverbootstrap(); serverbootstrap.group(bossgroup, workergroup) .channel(nioserversocketchannel.class) .option(channeloption.so_backlog, 128) .childoption(channeloption.so_keepalive, true) .localaddress(port).childhandler( new channelinitializer<socketchannel>() { @override protected void initchannel(socketchannel ch) throws exception { channelpipeline pipeline = ch.pipeline(); //编码器 pipeline.addlast("encoder", new objectencoder()); //解码器 pipeline.addlast("decoder", new objectdecoder(integer.max_value, classresolvers.cachedisabled(null))); //服务器端业务处理类 pipeline.addlast(new invokehandler()); } }); channelfuture future = serverbootstrap.bind(port).sync(); system.out.println("......server is ready......"); future.channel().closefuture().sync(); } catch (exception e) { bossgroup.shutdowngracefully(); workergroup.shutdowngracefully(); } } public static void main(string[] args) throws exception { new nettyrpcserver(9999).start(); } }
服务器端业务处理类
public class invokehandler extends channelinboundhandleradapter { //得到某接口下某个实现类的名字 private string getimplclassname(classinfo classinfo) throws exception{ //服务方接口和实现类所在的包路径 string interfacepath="com.lyz.server"; int lastdot = classinfo.getclassname().lastindexof("."); string interfacename=classinfo.getclassname().substring(lastdot); class superclass=class.forname(interfacepath+interfacename); reflections reflections = new reflections(interfacepath); //得到某接口下的所有实现类 set<class> implclassset=reflections.getsubtypesof(superclass); if(implclassset.size()==0){ system.out.println("未找到实现类"); return null; }else if(implclassset.size()>1){ system.out.println("找到多个实现类,未明确使用哪一个"); return null; }else { //把集合转换为数组 class[] classes=implclassset.toarray(new class[0]); return classes[0].getname(); //得到实现类的名字 } } @override //读取客户端发来的数据并通过反射调用实现类的方法 public void channelread(channelhandlercontext ctx, object msg) throws exception { classinfo classinfo = (classinfo) msg; system.out.println(classinfo); object clazz = class.forname(getimplclassname(classinfo)).newinstance(); method method = clazz.getclass().getmethod(classinfo.getmethodname(), classinfo.gettypes()); //通过反射调用实现类的方法 object result = method.invoke(clazz, classinfo.getobjects()); ctx.writeandflush(result); } }
服务端接口及实现类
// 无参接口 public interface hellonetty { string hello(); } // 实现类 public class hellonettyimpl implements hellonetty { @override public string hello() { return "hello,netty"; } } // 带参接口 public interface hellorpc { string hello(string name); } // 实现类 public class hellorpcimpl implements hellorpc { @override public string hello(string name) { return "hello," + name; } }
客户端
代理类
public class nettyrpcproxy { //根据接口创建代理对象 public static object create(class target) { return proxy.newproxyinstance(target.getclassloader(), new class[]{target}, new invocationhandler() { @override public object invoke(object proxy, method method, object[] args) throws throwable { //封装classinfo classinfo classinfo = new classinfo(); classinfo.setclassname(target.getname()); classinfo.setmethodname(method.getname()); classinfo.setobjects(args); classinfo.settypes(method.getparametertypes()); //开始用netty发送数据 eventloopgroup group = new nioeventloopgroup(); resulthandler resulthandler = new resulthandler(); try { bootstrap b = new bootstrap(); b.group(group) .channel(niosocketchannel.class) .handler(new channelinitializer<socketchannel>() { @override public void initchannel(socketchannel ch) throws exception { channelpipeline pipeline = ch.pipeline(); //编码器 pipeline.addlast("encoder", new objectencoder()); //解码器 构造方法第一个参数设置二进制数据的最大字节数 第二个参数设置具体使用哪个类解析器 pipeline.addlast("decoder", new objectdecoder(integer.max_value, classresolvers.cachedisabled(null))); //客户端业务处理类 pipeline.addlast("handler", resulthandler); } }); channelfuture future = b.connect("127.0.0.1", 9999).sync(); future.channel().writeandflush(classinfo).sync(); future.channel().closefuture().sync(); } finally { group.shutdowngracefully(); } return resulthandler.getresponse(); } }); } }
客户端业务处理类
public class resulthandler extends channelinboundhandleradapter { private object response; public object getresponse() { return response; } @override //读取服务器端返回的数据(远程调用的结果) public void channelread(channelhandlercontext ctx, object msg) throws exception { response = msg; ctx.close(); } }
客户端接口
// 无参接口 public interface hellonetty { string hello(); } // 带参接口 public interface hellorpc { string hello(string name); }
测试类 服务调用方
public class testnettyrpc { public static void main(string [] args){ //第1次远程调用 hellonetty hellonetty=(hellonetty) nettyrpcproxy.create(hellonetty.class); system.out.println(hellonetty.hello()); //第2次远程调用 hellorpc hellorpc = (hellorpc) nettyrpcproxy.create(hellorpc.class); system.out.println(hellorpc.hello("rpc")); } }
输出结果
服务端
......server is ready...... com.lyz.serverstub.classinfo@2b894733 com.lyz.serverstub.classinfo@167bfa9
客户端
hello,netty hello,rpc
下一篇通过netty实现线上聊天功能