天天看點

RPC服務治理架構(一)RPC技術

一、RPC是什麼

remote procedure call:遠端過程調用

過程就是程式,像調用本地方法一樣調用遠端的過程

RPC采用Client-Server結構,通過request-response消息模式實作

RMI(remote method invocation)遠端方法調用時oop領域中RPC的一種具體實作

webservice、restfull接口調用都是RPC,僅消息組織方式及消息協定不同

與本地調用相比,速度相對較慢、可靠性減弱

為什麼用RPC

  • 服務化
  • 可重用
  • 系統間互動調用

術語

RPC服務治理架構(一)RPC技術

二、RPC的流程環節

RPC服務治理架構(一)RPC技術

 1.用戶端處理過程中調用Client stub,傳遞參數

2.Client stub将參數編組為消息,然後通過系統調用向服務端發送消息

3.用戶端本地作業系統将消息從用戶端機器發送到服務端機器

4.服務端作業系統接收到資料包傳遞給Server stub

5.Server stub解組消息為參數

6.Server stub再調用服務端的過程,過程執行結果以反方向的相同的步驟響應給用戶端

需要處理的問題

1.Client stub、Server stub的開發

2.參數如何編組為消息,以及解組消息

3.消息如何發送

4.過程結果如何表示、異常情況如何處理

5.如何實作安全的通路控制

三、RPC協定

RPC調用過程中需要将參數編組為消息進行發送,接受方需要解組消息為參數,過程處理結果同樣需要編組、解組。消息由哪些部分構成及消息的表示形式就構成了消息協定。

RPC調用過程中采用協定成為RPC協定。

RPC服務治理架構(一)RPC技術

常見RPC協定

RPC服務治理架構(一)RPC技術

四、手寫RPC架構

封裝好參數編組、消息解碼、底層網絡通信的RPC程式開發架構,帶來的便捷是可以直接在其基礎上隻需專注于過程代碼的編寫

RPC服務治理架構(一)RPC技術

從使用者角度開始

RPC服務治理架構(一)RPC技術

 2.1 用戶端

2.1.1 用戶端設計

用戶端生成過程接口的代理對象

設計用戶端代理工廠,用JDK動态代理即可生成接口的代理對象

RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術

 ServiceInfoDiscoverer接口得到服務資訊,傳回服務資訊的清單,大并發的支援,某個服務提供者可能有多個提供者,并發量很大需要用到叢集

ServiceInfo,服務的名稱,服務協定

根據需要提供服務資訊發現者,動态可以使用zookeeper

RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術

 消息協定獨立為一層,用戶端、服務端均需要

RPC服務治理架構(一)RPC技術

用戶端完整類圖

RPC服務治理架構(一)RPC技術

 不同顔色代表不同層,入口是ClientStubProxyFactory

2.1.2 實作用戶端

RPC服務治理架構(一)RPC技術
package com.study.mike.rpc.client;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;

import com.study.mike.rpc.client.net.NetClient;
import com.study.mike.rpc.common.protocol.MessageProtocol;
import com.study.mike.rpc.common.protocol.Request;
import com.study.mike.rpc.common.protocol.Response;
import com.study.mike.rpc.discovery.ServiceInfo;
import com.study.mike.rpc.discovery.ServiceInfoDiscoverer;

public class ClientStubProxyFactory {

    private ServiceInfoDiscoverer sid;

    private Map<String, MessageProtocol> supportMessageProtocols;

    private NetClient netClient;

    private Map<Class<?>, Object> objectCache = new HashMap<>();

    public <T> T getProxy(Class<T> interf) {
        T obj = (T) this.objectCache.get(interf);
        if (obj == null) {
            obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
                    new ClientStubInvocationHandler(interf));
            this.objectCache.put(interf, obj);
        }

        return obj;
    }

    public ServiceInfoDiscoverer getSid() {
        return sid;
    }

    public void setSid(ServiceInfoDiscoverer sid) {
        this.sid = sid;
    }

    public Map<String, MessageProtocol> getSupportMessageProtocols() {
        return supportMessageProtocols;
    }

    public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
        this.supportMessageProtocols = supportMessageProtocols;
    }

    public NetClient getNetClient() {
        return netClient;
    }

    public void setNetClient(NetClient netClient) {
        this.netClient = netClient;
    }

    private class ClientStubInvocationHandler implements InvocationHandler {
        private Class<?> interf;

        private Random random = new Random();

        public ClientStubInvocationHandler(Class<?> interf) {
            super();
            this.interf = interf;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

            if (method.getName().equals("toString")) {
                return proxy.getClass().toString();
            }

            if (method.getName().equals("hashCode")) {
                return 0;
            }

            // 1、獲得服務資訊
            String serviceName = this.interf.getName();
            List<ServiceInfo> sinfos = sid.getServiceInfo(serviceName);

            if (sinfos == null || sinfos.size() == 0) {
                throw new Exception("遠端服務不存在!");
            }

            // 随機選擇一個服務提供者(軟負載均衡)
            ServiceInfo sinfo = sinfos.get(random.nextInt(sinfos.size()));

            // 2、構造request對象
            Request req = new Request();
            req.setServiceName(sinfo.getName());
            req.setMethod(method.getName());
            req.setPrameterTypes(method.getParameterTypes());
            req.setParameters(args);

            // 3、協定層編組
            // 獲得該方法對應的協定
            MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
            // 編組請求
            byte[] data = protocol.marshallingRequest(req);

            // 4、調用網絡層發送請求
            byte[] repData = netClient.sendRequest(data, sinfo);

            // 5解組響應消息
            Response rsp = protocol.unmarshallingResponse(repData);

            // 6、結果處理
            if (rsp.getException() != null) {
                throw rsp.getException();
            }

            return rsp.getReturnValue();
        }
    }
}      
package com.study.mike.rpc.client.net;

import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.study.mike.rpc.discovery.ServiceInfo;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyNetClient implements NetClient {

    private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);

    @Override
    public byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable {

        String[] addInfoArray = sinfo.getAddress().split(":");

        SendHandler sendHandler = new SendHandler(data);
        byte[] respData = null;
        // 配置用戶端
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();

            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(sendHandler);
                        }
                    });

            // 啟動用戶端連接配接
            b.connect(addInfoArray[0], Integer.valueOf(addInfoArray[1])).sync();
            respData = (byte[]) sendHandler.rspData();
            logger.info("sendRequest get reply: " + respData);

        } finally {
            // 釋放線程組資源
            group.shutdownGracefully();
        }

        return respData;
    }

    private class SendHandler extends ChannelInboundHandlerAdapter {

        private CountDownLatch cdl = null;
        private Object readMsg = null;
        private byte[] data;

        public SendHandler(byte[] data) {
            cdl = new CountDownLatch(1);
            this.data = data;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("連接配接服務端成功:" + ctx);
            ByteBuf reqBuf = Unpooled.buffer(data.length);
            reqBuf.writeBytes(data);
            logger.info("用戶端發送消息:" + reqBuf);
            ctx.writeAndFlush(reqBuf);
        }

        public Object rspData() {

            try {
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            return readMsg;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            logger.info("client read msg: " + msg);
            ByteBuf msgBuf = (ByteBuf) msg;
            byte[] resp = new byte[msgBuf.readableBytes()];
            msgBuf.readBytes(resp);
            readMsg = resp;
            cdl.countDown();
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            logger.error("發生異常:" + cause.getMessage());
            ctx.close();
        }
    }
}      
package com.study.mike.rpc.client.net;

import com.study.mike.rpc.discovery.ServiceInfo;

public interface NetClient {
    byte[] sendRequest(byte[] data, ServiceInfo sinfo) throws Throwable;
}      
package com.study.mike.rpc.discovery;

public class ServiceInfo {

    private String name;

    private String protocol;

    private String address;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getProtocol() {
        return protocol;
    }

    public void setProtocol(String protocol) {
        this.protocol = protocol;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

}      
package com.study.mike.rpc.discovery;

import java.util.List;

public interface ServiceInfoDiscoverer {
    List<ServiceInfo> getServiceInfo(String name);
}      
package com.study.mike.rpc.discovery;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.List;

import org.I0Itec.zkclient.ZkClient;

import com.alibaba.fastjson.JSON;
import com.study.mike.rpc.server.register.MyZkSerializer;
import com.study.mike.rpc.util.PropertiesUtils;

public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer {

    ZkClient client;

    private String centerRootPath = "/Rpc-framework";

    public ZookeeperServiceInfoDiscoverer() {
        String addr = PropertiesUtils.getProperties("zk.address");
        client = new ZkClient(addr);
        client.setZkSerializer(new MyZkSerializer());
    }

    @Override
    public List<ServiceInfo> getServiceInfo(String name) {
        String servicePath = centerRootPath + "/" + name + "/service";
        List<String> children = client.getChildren(servicePath);
        List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
        for (String ch : children) {
            try {
                String deCh = URLDecoder.decode(ch, "UTF-8");
                ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
                resources.add(r);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        return resources;
    }

}      
package com.study.mike.rpc.common.protocol;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;

public class JavaSerializeMessageProtocol implements MessageProtocol {

    private byte[] serialize(Object obj) throws Exception {
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        ObjectOutputStream out = new ObjectOutputStream(bout);
        out.writeObject(obj);

        return bout.toByteArray();
    }

    @Override
    public byte[] marshallingRequest(Request req) throws Exception {

        return this.serialize(req);
    }

    @Override
    public Request unmarshallingRequest(byte[] data) throws Exception {
        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
        return (Request) in.readObject();
    }

    @Override
    public byte[] marshallingResponse(Response rsp) throws Exception {
        return this.serialize(rsp);
    }

    @Override
    public Response unmarshallingResponse(byte[] data) throws Exception {
        ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(data));
        return (Response) in.readObject();
    }

}      
package com.study.mike.rpc.demo.consumer;

import java.awt.Point;
import java.util.HashMap;
import java.util.Map;

import com.study.mike.rpc.client.ClientStubProxyFactory;
import com.study.mike.rpc.client.net.NettyNetClient;
import com.study.mike.rpc.common.protocol.JavaSerializeMessageProtocol;
import com.study.mike.rpc.common.protocol.MessageProtocol;
import com.study.mike.rpc.demo.DemoService;
import com.study.mike.rpc.discovery.ZookeeperServiceInfoDiscoverer;

public class Consumer {
    public static void main(String[] args) throws Exception {

        ClientStubProxyFactory cspf = new ClientStubProxyFactory();
        // 設定服務發現者
        cspf.setSid(new ZookeeperServiceInfoDiscoverer());

        // 設定支援的協定
        Map<String, MessageProtocol> supportMessageProtocols = new HashMap<>();
        supportMessageProtocols.put("javas", new JavaSerializeMessageProtocol());
        cspf.setSupportMessageProtocols(supportMessageProtocols);

        // 設定網絡層實作
        cspf.setNetClient(new NettyNetClient());

        DemoService demoService = cspf.getProxy(DemoService.class); // 擷取遠端服務代理
        String hello = demoService.sayHello("world"); // 執行遠端方法
        System.out.println(hello); // 顯示調用結果

        System.out.println(demoService.multiPoint(new Point(5, 10), 2));
    }
}      

2.2 服務端

2.2.1 設計服務端

RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術
RPC服務治理架構(一)RPC技術

2.2.2 實作服務端

RPC服務治理架構(一)RPC技術
package com.study.mike.rpc.demo.provider;

import com.study.mike.rpc.common.protocol.JavaSerializeMessageProtocol;
import com.study.mike.rpc.demo.DemoService;
import com.study.mike.rpc.server.NettyRpcServer;
import com.study.mike.rpc.server.RequestHandler;
import com.study.mike.rpc.server.RpcServer;
import com.study.mike.rpc.server.register.ServiceObject;
import com.study.mike.rpc.server.register.ServiceRegister;
import com.study.mike.rpc.server.register.ZookeeperExportServiceRegister;
import com.study.mike.rpc.util.PropertiesUtils;

public class Provider {
    public static void main(String[] args) throws Exception {

        int port = Integer.parseInt(PropertiesUtils.getProperties("rpc.port"));
        String protocol = PropertiesUtils.getProperties("rpc.protocol");

        // 服務注冊
        ServiceRegister sr = new ZookeeperExportServiceRegister();
        DemoService ds = new DemoServiceImpl();
        ServiceObject so = new ServiceObject(DemoService.class.getName(), DemoService.class, ds);
        sr.register(so, protocol, port);

        RequestHandler reqHandler = new RequestHandler(new JavaSerializeMessageProtocol(), sr);

        RpcServer server = new NettyRpcServer(port, protocol, reqHandler);
        server.start();
        System.in.read(); // 按任意鍵退出
        server.stop();
    }
}      

配置端口

app.properties

zk.address=127.0.0.1:2181

rpc.port=19000
rpc.protocol=javas      
package com.study.mike.rpc.server.register;

import java.util.HashMap;
import java.util.Map;

public class DefaultServiceRegister implements ServiceRegister {

    private Map<String, ServiceObject> serviceMap = new HashMap<>();

    @Override
    public void register(ServiceObject so, String protocolName, int port) throws Exception {
        if (so == null) {
            throw new IllegalArgumentException("參數不能為空");
        }

        this.serviceMap.put(so.getName(), so);
    }

    @Override
    public ServiceObject getServiceObject(String name) {
        return this.serviceMap.get(name);
    }

}      
package com.study.mike.rpc.server.register;

import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.URLEncoder;

import org.I0Itec.zkclient.ZkClient;

import com.alibaba.fastjson.JSON;
import com.study.mike.rpc.discovery.ServiceInfo;
import com.study.mike.rpc.util.PropertiesUtils;

/**
 * Zookeeper方式擷取遠端服務資訊類。
 * 
 * ZookeeperServiceInfoDiscoverer
 */
public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister {

    private ZkClient client;

    private String centerRootPath = "/Rpc-framework";

    public ZookeeperExportServiceRegister() {
        String addr = PropertiesUtils.getProperties("zk.address");
        client = new ZkClient(addr);
        client.setZkSerializer(new MyZkSerializer());
    }

    @Override
    public void register(ServiceObject so, String protocolName, int port) throws Exception {
        super.register(so, protocolName, port);
        ServiceInfo soInf = new ServiceInfo();

        String host = InetAddress.getLocalHost().getHostAddress();
        String address = host + ":" + port;
        soInf.setAddress(address);
        soInf.setName(so.getInterf().getName());
        soInf.setProtocol(protocolName);
        this.exportService(soInf);

    }

    private void exportService(ServiceInfo serviceResource) {
        String serviceName = serviceResource.getName();
        String uri = JSON.toJSONString(serviceResource);
        try {
            uri = URLEncoder.encode(uri, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        String servicePath = centerRootPath + "/" + serviceName + "/service";
        if (!client.exists(servicePath)) {
            client.createPersistent(servicePath, true);
        }
        String uriPath = servicePath + "/" + uri;
        if (client.exists(uriPath)) {
            client.delete(uriPath);
        }
        client.createEphemeral(uriPath);
    }
}      
package com.study.mike.rpc.server;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class NettyRpcServer extends RpcServer {
    private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);

    private Channel channel;

    public NettyRpcServer(int port, String protocol, RequestHandler handler) {
        super(port, protocol, handler);
    }

    @Override
    public void start() {
        // 配置伺服器
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new ChannelRequestHandler());
                        }
                    });

            // 啟動服務
            ChannelFuture f = b.bind(port).sync();
            logger.info("完成服務端端口綁定與啟動");
            channel = f.channel();
            // 等待服務通道關閉
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 釋放線程組資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    @Override
    public void stop() {
        this.channel.close();
    }

    private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info("激活");
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            logger.info("服務端收到消息:" + msg);
            ByteBuf msgBuf = (ByteBuf) msg;
            byte[] req = new byte[msgBuf.readableBytes()];
            msgBuf.readBytes(req);
            byte[] res = handler.handleRequest(req);
            logger.info("發送響應:" + msg);
            ByteBuf respBuf = Unpooled.buffer(res.length);
            respBuf.writeBytes(res);
            ctx.write(respBuf);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            logger.error("發生異常:" + cause.getMessage());
            ctx.close();
        }
    }

}      
package com.study.mike.rpc.server;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import com.study.mike.rpc.common.protocol.MessageProtocol;
import com.study.mike.rpc.common.protocol.Request;
import com.study.mike.rpc.common.protocol.Response;
import com.study.mike.rpc.common.protocol.Status;
import com.study.mike.rpc.server.register.ServiceObject;
import com.study.mike.rpc.server.register.ServiceRegister;

public class RequestHandler {
    private MessageProtocol protocol;

    private ServiceRegister serviceRegister;

    public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) {
        super();
        this.protocol = protocol;
        this.serviceRegister = serviceRegister;
    }

    public byte[] handleRequest(byte[] data) throws Exception {
        // 1、解組消息
        Request req = this.protocol.unmarshallingRequest(data);

        // 2、查找服務對象
        ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName());

        Response rsp = null;

        if (so == null) {
            rsp = new Response(Status.NOT_FOUND);
        } else {
            // 3、反射調用對應的過程方法
            try {
                Method m = so.getInterf().getMethod(req.getMethod(), req.getPrameterTypes());
                Object returnValue = m.invoke(so.getObj(), req.getParameters());
                rsp = new Response(Status.SUCCESS);
                rsp.setReturnValue(returnValue);
            } catch (NoSuchMethodException | SecurityException | IllegalAccessException | IllegalArgumentException
                    | InvocationTargetException e) {
                rsp = new Response(Status.ERROR);
                rsp.setException(e);
            }
        }

        // 4、編組響應消息
        return this.protocol.marshallingResponse(rsp);
    }

    public MessageProtocol getProtocol() {
        return protocol;
    }

    public void setProtocol(MessageProtocol protocol) {
        this.protocol = protocol;
    }

    public ServiceRegister getServiceRegister() {
        return serviceRegister;
    }

    public void setServiceRegister(ServiceRegister serviceRegister) {
        this.serviceRegister = serviceRegister;
    }

}      

轉載于:https://www.cnblogs.com/aidata/p/11589082.html