天天看點

基于netty實作rpc架構-spring boot服務端

基于netty實作rpc架構-spring boot服務端

demo位址

https://gitee.com/syher/grave-netty

RPC介紹

首先了解一下RPC:遠端過程調用。簡單點說就是本地應用可以調用遠端伺服器的接口。那麼通過什麼方式調用遠端接口呢?說白了RPC隻是一種概念。他的調用可以基于HTTP實作,也可以基于TCP/IP實作。甚至私人定制的通訊協定。

當然,私人定制通訊協定成本過高且不具備通用性。我們不做展開讨論(其實我也展不開。。。)。那為什麼不使用HTTP協定呢?受限于HTTP協定層級過高,資料傳輸效率不如TCP/IP。是以RPC遠端調用一般采用TCP/IP實作。即調用socket方法。

RPC實作原理

  1. 用戶端發起遠端服務調用。
  2. 用戶端将類資訊、調用方法和入參資訊通過socket通道發送給服務端。
  3. 服務端解析資料包,調用本地接口。

5.将執行結果通過socket傳回給用戶端。

6.用戶端拿到并解析傳回結果。

RPC實作

java如何實作一個rpc架構,其實就是按照上面的原理再做一些詳細的補充。比如通過動态代理封裝用戶端的資料包、通過反射機制實作服務端實作類的調用等等。

今天,我們先基于spring boot + netty 做rpc服務端的實作。

首先,做一個注解用于辨別接口提供rpc調用。

1

2

3

4

5

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

public @interface Service {

    String name() default "";

}

該注解用于提供服務的實作類上。

public interface INettyService {

    String getString();

其實作類:

6

7

8

9

package com.braska.grave.netty.server.service;

@Service // 該注解為自定義rpc服務注解

public class NettyService implements INettyService {

    @Override

    public String getString() {

        return "welcome to use netty rpc.";

    }

接着,定義一個注解用來掃描指定包名下的Service注解。

@Target({ElementType.TYPE})

@Documented

@Import({NettyServerScannerRegistrar.class, NettyServerApplicationContextAware.class})

public @interface NettyServerScan {

    String[] basePackages();

該注解用于spring boot啟動類上,參數basePackages指定服務所在的包路徑。

10

11

@SpringBootApplication

@NettyServerScan(basePackages = {

        "com.braska.grave.netty.server.service"

})

public class GraveNettyServerApplication {

    public static void main(String[] args) {

        SpringApplication.run(GraveNettyServerApplication.class, args);

NettyServerScannerRegistrar類處理服務的spring bean注冊。

12

13

14

15

16

17

18

19

20

21

22

23

24

25

public class NettyServerScannerRegistrar implements BeanFactoryAware, ImportBeanDefinitionRegistrar, ResourceLoaderAware {

    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {         // 建立掃描器執行個體

        NettyServerInterfaceScanner scanner = new NettyServerInterfaceScanner(registry);

        if (this.resourceLoader != null) {

            scanner.setResourceLoader(this.resourceLoader);

        }

        AnnotationAttributes annoAttrs =

                AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyServerScan.class.getName()));

        List basePackages = new ArrayList();

        for (String pkg : annoAttrs.getStringArray("basePackages")) {

            if (StringUtils.hasText(pkg)) {

                basePackages.add(pkg);

            }

     // 隻掃描指定的注解。

        scanner.setAnnotationClass(Service.class);

        scanner.registerFilters();     // 将basePackages裡面的通過@Service注解的類注冊成spring bean。

        scanner.doScan(StringUtils.toStringArray(basePackages));

NettyServerApplicationContextAware類,暴露socket server端口。

26

27

28

29

public class NettyServerApplicationContextAware implements ApplicationContextAware, InitializingBean {

    private static final Logger logger = Logger.getLogger(NettyServerApplicationContextAware.class.getName());   // 存儲接口與實作類的映射,其中key是接口名。value是實作類的bean。

    private Map serviceMap = new HashMap<>();   // 服務worker。包含netty socket服務端生命周期及讀寫。

    ServerWorker runner;

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        String address = applicationContext.getEnvironment().getProperty("remoteAddress");

        Map beans = applicationContext.getBeansWithAnnotation(Service.class);

        for (Object serviceBean : beans.values()) {

            Class<?> clazz = serviceBean.getClass();

            Class<?>[] interfaces = clazz.getInterfaces();

            for (Class<?> inter : interfaces) {

                String interfaceName = inter.getName();

                serviceMap.put(interfaceName, serviceBean);

        }     // 建立netty worker對象

        runner = new ServerWorker(address, serviceMap);

    public void afterPropertiesSet() throws Exception {     // 建立netty socketServer及通道處理器

        runner.open();

ServerWorker類的open方法。

public class ServerWorker extends ChannelInitializer {

// socket ip:port private String remoteAddress;

// 實作類的beanMap private Map serviceMap;

// netty channel處理器 NettyServerHandler handler;public void open() { try { int parallel = Runtime.getRuntime().availableProcessors() * 2; ServerBootstrap bootstrap = new ServerBootstrap(); this.bossGroup = new NioEventLoopGroup(); // todo 使用線程池,提高并發能力 this.workerGroup = new NioEventLoopGroup(parallel); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(this); String[] hostAndPort = this.remoteAddress.split(":"); if (hostAndPort == null || hostAndPort.length != 2) { throw new RuntimeException("remoteAddress is error."); } ChannelFuture cf = bootstrap.bind(hostAndPort[0], Integer.parseInt(hostAndPort[1])).sync(); // todo 資訊寫入注冊中心 // registry.register(serverAddress); logger.info("netty 伺服器啟動.監聽端口:" + hostAndPort[1]); // 等待服務端監聽端口關閉 cf.channel().closeFuture().sync(); } catch (Exception e) { logger.log(Level.SEVERE, "netty server open failed.", e); this.bossGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully(); } } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JSONEncoder()); pipeline.addLast(new JSONDecoder()); pipeline.addLast(this.handler); } }

NettyServerHandler服務端channel處理器,繼承ChannelInboundHandlerAdapter。

30

31

32

33

@ChannelHandler.Sharable

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private Map serviceMap;

    public NettyServerHandler(Map serviceMap) {

        this.serviceMap = serviceMap;

    public void channelRead(ChannelHandlerContext ctx, Object msg) {     // 解析用戶端發送過來的資料。包含類名、方法名、入參等資訊。

        Request request = JSON.parseObject(msg.toString(), Request.class);

        Response response = new Response();

        response.setRequestId(request.getId());

        try {       // 調用本地實作類

            Object res = this.handler(request);

            response.setData(res);

        } catch (Exception e) {

            response.setCode(-1);

            response.setError(e.getMessage());

            logger.log(Level.SEVERE, "請求調用失敗", e);

        }     // 傳回處理結果給用戶端

        ctx.writeAndFlush(response);

    private Object handler(Request request) throws Exception {

        String className = request.getClassName();     // 通過className從beanMap映射中找到托管給spring的bean實作類。

        Object serviceBean = serviceMap.get(className);

        String methodName = request.getMethodName();

        Object[] parameters = request.getParameters();     // 通過反射機制調用實作類。并傳回調用結果。

        return MethodUtils.invokeMethod(serviceBean, methodName, parameters);

至此,rpc服務端的實作就完成了。

一路看下來,服務端的代碼實作還是比較簡單的。核心代碼隻有兩個類:ServerWorker和NettyServerHandler。其餘的都是對spring bean注冊的支援。

EOF

本文作者:Braska。

本文連結:

https://www.cnblogs.com/braska/p/12753055.html