基于netty實作rpc架構-spring boot服務端
demo位址
https://gitee.com/syher/grave-nettyRPC介紹
首先了解一下RPC:遠端過程調用。簡單點說就是本地應用可以調用遠端伺服器的接口。那麼通過什麼方式調用遠端接口呢?說白了RPC隻是一種概念。他的調用可以基于HTTP實作,也可以基于TCP/IP實作。甚至私人定制的通訊協定。
當然,私人定制通訊協定成本過高且不具備通用性。我們不做展開讨論(其實我也展不開。。。)。那為什麼不使用HTTP協定呢?受限于HTTP協定層級過高,資料傳輸效率不如TCP/IP。是以RPC遠端調用一般采用TCP/IP實作。即調用socket方法。
RPC實作原理
- 用戶端發起遠端服務調用。
- 用戶端将類資訊、調用方法和入參資訊通過socket通道發送給服務端。
- 服務端解析資料包,調用本地接口。
5.将執行結果通過socket傳回給用戶端。
6.用戶端拿到并解析傳回結果。
RPC實作
java如何實作一個rpc架構,其實就是按照上面的原理再做一些詳細的補充。比如通過動态代理封裝用戶端的資料包、通過反射機制實作服務端實作類的調用等等。
今天,我們先基于spring boot + netty 做rpc服務端的實作。
首先,做一個注解用于辨別接口提供rpc調用。
1
2
3
4
5
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Service {
String name() default "";
}
該注解用于提供服務的實作類上。
public interface INettyService {
String getString();
其實作類:
6
7
8
9
package com.braska.grave.netty.server.service;
@Service // 該注解為自定義rpc服務注解
public class NettyService implements INettyService {
@Override
public String getString() {
return "welcome to use netty rpc.";
}
接着,定義一個注解用來掃描指定包名下的Service注解。
@Target({ElementType.TYPE})
@Documented
@Import({NettyServerScannerRegistrar.class, NettyServerApplicationContextAware.class})
public @interface NettyServerScan {
String[] basePackages();
該注解用于spring boot啟動類上,參數basePackages指定服務所在的包路徑。
10
11
@SpringBootApplication
@NettyServerScan(basePackages = {
"com.braska.grave.netty.server.service"
})
public class GraveNettyServerApplication {
public static void main(String[] args) {
SpringApplication.run(GraveNettyServerApplication.class, args);
NettyServerScannerRegistrar類處理服務的spring bean注冊。
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class NettyServerScannerRegistrar implements BeanFactoryAware, ImportBeanDefinitionRegistrar, ResourceLoaderAware {
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { // 建立掃描器執行個體
NettyServerInterfaceScanner scanner = new NettyServerInterfaceScanner(registry);
if (this.resourceLoader != null) {
scanner.setResourceLoader(this.resourceLoader);
}
AnnotationAttributes annoAttrs =
AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyServerScan.class.getName()));
List basePackages = new ArrayList();
for (String pkg : annoAttrs.getStringArray("basePackages")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
// 隻掃描指定的注解。
scanner.setAnnotationClass(Service.class);
scanner.registerFilters(); // 将basePackages裡面的通過@Service注解的類注冊成spring bean。
scanner.doScan(StringUtils.toStringArray(basePackages));
NettyServerApplicationContextAware類,暴露socket server端口。
26
27
28
29
public class NettyServerApplicationContextAware implements ApplicationContextAware, InitializingBean {
private static final Logger logger = Logger.getLogger(NettyServerApplicationContextAware.class.getName()); // 存儲接口與實作類的映射,其中key是接口名。value是實作類的bean。
private Map serviceMap = new HashMap<>(); // 服務worker。包含netty socket服務端生命周期及讀寫。
ServerWorker runner;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
String address = applicationContext.getEnvironment().getProperty("remoteAddress");
Map beans = applicationContext.getBeansWithAnnotation(Service.class);
for (Object serviceBean : beans.values()) {
Class<?> clazz = serviceBean.getClass();
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> inter : interfaces) {
String interfaceName = inter.getName();
serviceMap.put(interfaceName, serviceBean);
} // 建立netty worker對象
runner = new ServerWorker(address, serviceMap);
public void afterPropertiesSet() throws Exception { // 建立netty socketServer及通道處理器
runner.open();
ServerWorker類的open方法。
public class ServerWorker extends ChannelInitializer {
// socket ip:port private String remoteAddress;
// 實作類的beanMap private Map serviceMap;
// netty channel處理器 NettyServerHandler handler;public void open() { try { int parallel = Runtime.getRuntime().availableProcessors() * 2; ServerBootstrap bootstrap = new ServerBootstrap(); this.bossGroup = new NioEventLoopGroup(); // todo 使用線程池,提高并發能力 this.workerGroup = new NioEventLoopGroup(parallel); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(this); String[] hostAndPort = this.remoteAddress.split(":"); if (hostAndPort == null || hostAndPort.length != 2) { throw new RuntimeException("remoteAddress is error."); } ChannelFuture cf = bootstrap.bind(hostAndPort[0], Integer.parseInt(hostAndPort[1])).sync(); // todo 資訊寫入注冊中心 // registry.register(serverAddress); logger.info("netty 伺服器啟動.監聽端口:" + hostAndPort[1]); // 等待服務端監聽端口關閉 cf.channel().closeFuture().sync(); } catch (Exception e) { logger.log(Level.SEVERE, "netty server open failed.", e); this.bossGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully(); } } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JSONEncoder()); pipeline.addLast(new JSONDecoder()); pipeline.addLast(this.handler); } }
NettyServerHandler服務端channel處理器,繼承ChannelInboundHandlerAdapter。
30
31
32
33
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private Map serviceMap;
public NettyServerHandler(Map serviceMap) {
this.serviceMap = serviceMap;
public void channelRead(ChannelHandlerContext ctx, Object msg) { // 解析用戶端發送過來的資料。包含類名、方法名、入參等資訊。
Request request = JSON.parseObject(msg.toString(), Request.class);
Response response = new Response();
response.setRequestId(request.getId());
try { // 調用本地實作類
Object res = this.handler(request);
response.setData(res);
} catch (Exception e) {
response.setCode(-1);
response.setError(e.getMessage());
logger.log(Level.SEVERE, "請求調用失敗", e);
} // 傳回處理結果給用戶端
ctx.writeAndFlush(response);
private Object handler(Request request) throws Exception {
String className = request.getClassName(); // 通過className從beanMap映射中找到托管給spring的bean實作類。
Object serviceBean = serviceMap.get(className);
String methodName = request.getMethodName();
Object[] parameters = request.getParameters(); // 通過反射機制調用實作類。并傳回調用結果。
return MethodUtils.invokeMethod(serviceBean, methodName, parameters);
至此,rpc服務端的實作就完成了。
一路看下來,服務端的代碼實作還是比較簡單的。核心代碼隻有兩個類:ServerWorker和NettyServerHandler。其餘的都是對spring bean注冊的支援。
EOF
本文作者:Braska。
本文連結:
https://www.cnblogs.com/braska/p/12753055.html