天天看点

基于 Netty 手写 RPC

RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序 上请求服务,而不需要了解底层网络实现的技术。常见的RPC 框架有: 源自阿里的Dubbo, g ,Google 出品的grpc 等等。

1,服务消费方(Client)以本地调用方式调用服务

2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体

3. client stub 将消息进行编码并发送到服务端

4. server stub 收到消息后进行解码

5. server stub 根据解码结果调用本地的服务

6. 本地服务执行并将结果返回给server stub

7. server stub 将返回导入结果进行编码并发送至消费方

8. client stub 接收到消息并进行解码

9. 服务消费方(client)得到结果

Client(服务的调用方): 两个接口+ 一个包含main 方法的测试类
 Client Sub: 一个客户端代理类+ 一个客户端业务处理类
Server(服务的提供方): 两个接口+ 两个实现类
Server Sub: 一个网络处理服务器+ 一个服务器业务处理类
 注意:服务调用方的接口必须跟服务提供方的接口保持一致(包路径可以不一致),最终要实现的目标是:在TestNettyRPC 中远程调用HelloRPCImpl 或HelloNettyImpl 中的方法
           

上述代码作为服务的提供方,我们分别编写了两个接口和两个实现类,供消费方远程调用

服务器端业务处理类

网络处理服务器      
public class NettyRpcServer {
private int port;

public NettyRpcServer(int port) {
this.port = port;
    }

public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup wprkGroup = new NioEventLoopGroup();
try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, wprkGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .localAddress(port)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
//编码器
                            pipeline.addLast("encoder", new ObjectEncoder())
//解码器
                                    .addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
                                    .addLast(new InvokeHandler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("......server is ready......");
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            bossGroup.shutdownGracefully();
            wprkGroup.shutdownGracefully();
        }
    }
public static void main(String[] args) {
new NettyRpcServer(9999).start();
    }
}
           

public class ResultHandler extends ChannelInboundHandlerAdapter {

private Object response;

public Object getResponse() {

return response;

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

response = msg;

ctx.close();

public class NettyRpcProxy {

//根据结构创建代理对象

public static Object create(Class target) {

return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

//封装ClassInfo

ClassInfo classInfo = new ClassInfo();

classInfo.setClassName(target.getName());

classInfo.setMethodName(method.getName());

classInfo.setObjects(args);

classInfo.setTypes(method.getParameterTypes());

//开始用Netty发送数据

EventLoopGroup group = new NioEventLoopGroup();

ResultHandler resultHandler = new ResultHandler();

try {

Bootstrap bootstrap = new Bootstrap();

bootstrap.group(group)

.channel(NioSocketChannel.class)

.handler(new ChannelInitializer<SocketChannel>() {

protected void initChannel(SocketChannel ch) throws Exception {

ChannelPipeline pipeline = ch.pipeline();

//编码器

pipeline.addLast("encoder", new ObjectEncoder())

//解码器,构造方法第一个参数设置二进制的最大字节数,第二个参数设置具体使用哪个类解析器

.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))

//客户端业务处理类

.addLast("handler", resultHandler);

});

ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();

future.channel().writeAndFlush(classInfo).sync();

future.channel().closeFuture().sync();

} finally {

group.shutdownGracefully();

return resultHandler.getResponse();

public interface HelloNetty {

String hello();

public interface HelloRPC {

String hello(String name);

rpc