Netty实现远程调用RPC功能

2022-10-13,,,,

添加依赖

<dependency>
    <groupid>io.netty</groupid>
    <artifactid>netty-all</artifactid>
    <version>4.1.2.final</version>
</dependency>

<dependency>
    <groupid>org.reflections</groupid>
    <artifactid>reflections</artifactid>
    <version>0.9.10</version>
</dependency>

组织架构

服务端

封装类信息

public class classinfo implements serializable {

    private static final long serialversionuid = 1l;

    private string classname;  //类名
    private string methodname;//方法名
    private class<?>[] types; //参数类型
    private object[] objects;//参数列表

    public string getclassname() {
        return classname;
    }

    public void setclassname(string classname) {
        this.classname = classname;
    }

    public string getmethodname() {
        return methodname;
    }

    public void setmethodname(string methodname) {
        this.methodname = methodname;
    }

    public class<?>[] gettypes() {
        return types;
    }

    public void settypes(class<?>[] types) {
        this.types = types;
    }

    public object[] getobjects() {
        return objects;
    }

    public void setobjects(object[] objects) {
        this.objects = objects;
    }
}

服务端网络处理服务器

public class nettyrpcserver {
    private int port;
    public nettyrpcserver(int port) {
        this.port = port;
    }

    public void start() {
        eventloopgroup bossgroup = new nioeventloopgroup();
        eventloopgroup workergroup = new nioeventloopgroup();
        try {
            serverbootstrap serverbootstrap = new serverbootstrap();
            serverbootstrap.group(bossgroup, workergroup)
                    .channel(nioserversocketchannel.class)
                    .option(channeloption.so_backlog, 128)
                    .childoption(channeloption.so_keepalive, true)
                    .localaddress(port).childhandler(
                            new channelinitializer<socketchannel>() {
                                @override
                                protected void initchannel(socketchannel ch) throws exception {
                                    channelpipeline pipeline = ch.pipeline();
                                    //编码器
                                    pipeline.addlast("encoder", new objectencoder());
                                    //解码器
                                    pipeline.addlast("decoder", new objectdecoder(integer.max_value, classresolvers.cachedisabled(null)));
                                    //服务器端业务处理类
                                    pipeline.addlast(new invokehandler());
                                }
                            });
            channelfuture future = serverbootstrap.bind(port).sync();
            system.out.println("......server is ready......");
            future.channel().closefuture().sync();
        } catch (exception e) {
            bossgroup.shutdowngracefully();
            workergroup.shutdowngracefully();
        }
    }

    public static void main(string[] args) throws exception {
        new nettyrpcserver(9999).start();
    }
}

服务器端业务处理类

public class invokehandler extends channelinboundhandleradapter {
    //得到某接口下某个实现类的名字
    private string getimplclassname(classinfo classinfo) throws exception{
        //服务方接口和实现类所在的包路径
        string interfacepath="com.lyz.server";
        int lastdot = classinfo.getclassname().lastindexof(".");
        string interfacename=classinfo.getclassname().substring(lastdot);
        class superclass=class.forname(interfacepath+interfacename);
        reflections reflections = new reflections(interfacepath);
        //得到某接口下的所有实现类
        set<class> implclassset=reflections.getsubtypesof(superclass);
        if(implclassset.size()==0){
            system.out.println("未找到实现类");
            return null;
        }else if(implclassset.size()>1){
            system.out.println("找到多个实现类,未明确使用哪一个");
            return null;
        }else {
            //把集合转换为数组
            class[] classes=implclassset.toarray(new class[0]);
            return classes[0].getname(); //得到实现类的名字
        }
    }

    @override  //读取客户端发来的数据并通过反射调用实现类的方法
    public void channelread(channelhandlercontext ctx, object msg) throws exception {
        classinfo classinfo = (classinfo) msg;
        system.out.println(classinfo);
        object clazz = class.forname(getimplclassname(classinfo)).newinstance();
        method method = clazz.getclass().getmethod(classinfo.getmethodname(), classinfo.gettypes());
        //通过反射调用实现类的方法
        object result = method.invoke(clazz, classinfo.getobjects());
        ctx.writeandflush(result);
    }
}

服务端接口及实现类

// 无参接口
public interface hellonetty {
    string hello();
}

// 实现类
public class hellonettyimpl implements hellonetty {
    @override
    public string hello() {
        return "hello,netty";
    }
}

// 带参接口
public interface hellorpc {
    string hello(string name);
}

// 实现类
public class hellorpcimpl implements hellorpc {
    @override
    public string hello(string name) {
        return "hello," + name;
    }
}

客户端

代理类

public class nettyrpcproxy {
    //根据接口创建代理对象
    public static object create(class target) {
        return proxy.newproxyinstance(target.getclassloader(), new class[]{target}, new invocationhandler() {
            @override
            public object invoke(object proxy, method method, object[] args)
                    throws throwable {
                //封装classinfo
                classinfo classinfo = new classinfo();
                classinfo.setclassname(target.getname());
                classinfo.setmethodname(method.getname());
                classinfo.setobjects(args);
                classinfo.settypes(method.getparametertypes());

                //开始用netty发送数据
                eventloopgroup group = new nioeventloopgroup();
                resulthandler resulthandler = new resulthandler();
                try {
                    bootstrap b = new bootstrap();
                    b.group(group)
                            .channel(niosocketchannel.class)
                            .handler(new channelinitializer<socketchannel>() {
                                @override
                                public void initchannel(socketchannel ch) throws exception {
                                    channelpipeline pipeline = ch.pipeline();
                                    //编码器
                                    pipeline.addlast("encoder", new objectencoder());
                                    //解码器  构造方法第一个参数设置二进制数据的最大字节数  第二个参数设置具体使用哪个类解析器
                                    pipeline.addlast("decoder", new objectdecoder(integer.max_value, classresolvers.cachedisabled(null)));
                                    //客户端业务处理类
                                    pipeline.addlast("handler", resulthandler);
                                }
                            });
                    channelfuture future = b.connect("127.0.0.1", 9999).sync();
                    future.channel().writeandflush(classinfo).sync();
                    future.channel().closefuture().sync();
                } finally {
                    group.shutdowngracefully();
                }
                return resulthandler.getresponse();
            }
        });
    }
}

客户端业务处理类

public class resulthandler extends channelinboundhandleradapter {

    private object response;
    public object getresponse() {
        return response;
    }

    @override //读取服务器端返回的数据(远程调用的结果)
    public void channelread(channelhandlercontext ctx, object msg) throws exception {
        response = msg;
        ctx.close();
    }
}

客户端接口

// 无参接口
public interface hellonetty {
    string hello();
}

// 带参接口
public interface hellorpc {
    string hello(string name);
}

测试类 服务调用方

public class testnettyrpc {
    public static void main(string [] args){

        //第1次远程调用
        hellonetty hellonetty=(hellonetty) nettyrpcproxy.create(hellonetty.class);
        system.out.println(hellonetty.hello());

        //第2次远程调用
        hellorpc hellorpc =  (hellorpc) nettyrpcproxy.create(hellorpc.class);
        system.out.println(hellorpc.hello("rpc"));

    }
}

输出结果

服务端

......server is ready......
com.lyz.serverstub.classinfo@2b894733
com.lyz.serverstub.classinfo@167bfa9

客户端

hello,netty
hello,rpc

下一篇通过netty实现线上聊天功能


《Netty实现远程调用RPC功能.doc》

下载本文的Word格式文档,以方便收藏与打印。