天天看点

dubbo的基础知识

dubbo

    • dubbo配置项
    • dubbo spi
    • 负载均衡策略
    • 异步调用
    • dubbo线程池
    • 路由规则

dubbo 是一款高性能,轻量级rpc框架,可以和spring集成

三大核心能力:

面向接口的远程方法调用

智能容错和负载均衡

服务自动注册和发现

其他特性:

高度可扩展能力 :协议,传输,序列化都被设计成扩展点

运行期流浪调度:配置路由规则实现灰度发布等功能

可视化服务治理和运维

dubbo配置项

<dubbo:application name="service-consumer"     >
         <dubbo:parameter key="qos.enable" value="true" ></dubbo:parameter>
         <dubbo:parameter key="qos.port" value="22222"></dubbo:parameter>
         <dubbo:parameter key="qos.accept.foreign.ip" value="true" ></dubbo:parameter>
    </dubbo:application>
    <!--    -->
     <dubbo:consumer timeout="2000"   check="false"  ></dubbo:consumer>

    <!-- 使用zookeeper注册中心暴露发现服务地址 -->
    <dubbo:registry address="zookeeper://127.0.0.1:2182"  timeout="10000"/>
	//...省略
           

dubbo:application

对应 org.apache.dubbo.config.ApplicationConfig类,代表当前应用的信息

name: 当前应用程序的名称。

owner: 当前应用程序的负责人。

qosEnable : 是否启动QoS 默认true(qos:在线运维dubbo 通过telnet 在线运维dubbo)

qosPort : 启动QoS绑定的端口 默认22222

qosAcceptForeignIp: 是否允许远程访问 默认是false

dubbo:registry

org.apache.dubbo.config.RegistryConfig, 代表该模块所使用的注册中心。一个模块中的服务可以将 其注册到多个注册中心上,也可以注册到一个上。在service和reference也会引入这个注册中心。

id : 当当前服务中provider或者consumer中存在多个注册中心时,则使用需要增加该配置。在一 些公司,会通过业务线的不同选择不同的注册中心,所以一般都会配置该值。

address : 当前注册中心的访问地址。

protocol : 当前注册中心所使用的协议是什么。也可以直接在 address 中写入,比如使用 zookeeper,就可以写成 zookeeper://xx.xx.xx.xx:2181

timeout : 当与注册中心不再同一个机房时,大多会把该参数延长。

dubbo:protocol

org.apache.dubbo.config.ProtocolConfig, 指定服务在进行数据传输所使用的协议。

id : 在大公司,可能因为各个部门技术栈不同,所以可能会选择使用不同的协议进行交互。这里

在多个协议使用时,需要指定。

name : 指定协议名称。默认使用 dubbo 。

dubbo:service

org.apache.dubbo.config.ServiceConfig, 服务提供者,用于指定当前需要对外暴露的服务信息,和dubbo:reference 大致相同。

interface : 指定当前需要进行对外暴露的接口是什么。

ref : 具体实现对象的引用,一般我们在生产级别都是使用Spring去进行Bean托管的,所以这里面 一般也指的是Spring中的BeanId。

version : 对外暴露的版本号。不同的版本号,消费者在消费的时候只会根据固定的版本号进行消费。

executes: 最大的并行度。

可能导致集群功能无法充分利用或者堵塞

但是也可以启动部分对应用的保护功能

可以不做配置,结合后面的熔断限流使用

dubbo:reference

org.apache.dubbo.config.ReferenceConfig, 消费者的配置

id : 指定该Bean在注册到Spring中的id。

interface: 服务接口名

version : 指定当前服务版本,与服务提供者的版本一致。

registry : 指定所具体使用的注册中心地址。这里面也就是使用上面在 dubbo:registry 中所声明的id。

mock: 用于在方法调用出现错误时,当做服务降级来统一对外返回结果。(dubbo:service也有)

timeout: 用于指定当前方法或者接口中所有方法的超时时间。(dubbo:service也有)

retries: 用于指定当前服务在执行时出现错误或者超时时的重试机制。(dubbo:service也有)

注意提供者是否有幂等,否则可能出现数据一致性问题

注意提供者是否有类似缓存机制,如出现大面积错误时,可能因为不停重试导致雪崩

dubbo:method

org.apache.dubbo.config.MethodConfig, 用于在制定的 dubbo:service 或者 dubbo:reference 中的 更具体一个层级,指定具体方法级别在进行RPC操作时候的配置,针对于具体方法的特殊处理。

name : 指定方法名称,用于对这个方法名称的RPC调用进行特殊配置。

async: 是否异步 默认false

dubbo spi

jdk spi

dubbo的spi和jdk的spi差不多

maven引入dubbo的依赖

<dependencies>
        <dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo</artifactId>
            <version>2.7.5</version>
        </dependency>
    </dependencies>
           
//接口上要加spi注解
@SPI("human")
public interface HelloService {
    String  sayHello();
}
           

resources文件下的META-INF/services/+接口路径 要写成META-INF/dubbo/+接口路径

内容建议写成key=value的格式,和jdkspi一样只写com.service.impl.HumanHelloService也可以只是不利于扩展

human=com.service.impl.HumanHelloService
           

测试一下

public class DubboSpiMain {
    public static void main(String[] args) {
        // 获取扩展加载器
        ExtensionLoader<HelloService>  extensionLoader  = ExtensionLoader.getExtensionLoader(HelloService.class);
        // 遍历所有的支持的扩展点 META-INF.dubbo
        Set<String>  extensions = extensionLoader.getSupportedExtensions();
        for (String extension : extensions){
            String result = extensionLoader.getExtension(extension).sayHello();
            System.out.println(result);
        }

    }
}
           

dubbo自己做SPI的目的

  1. JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源
  2. 如果有扩展点加载失败,则所有扩展点无法使用
  3. 提供了对扩展点包装的功能(Adaptive),并且还支持通过set的方式对其他的扩展点进行注入

Adaptive功能

Dubbo中的Adaptive功能,主要解决的问题是如何动态的选择具体的扩展点。通过 getAdaptiveExtension 统一对指定接口对应的所有扩展点进行封装,通过URL的方式对扩展点来进行动态选择。 (dubbo中所有的注册信息都是通过URL的形式进行处理的。)这里同样采用相同的方式进行 实现。

创建接口

//默认的实现类为key是human的类
@SPI("human")
public interface HelloService {
    String  sayHello();
    @Adaptive
    String  sayHello(URL  url);
}
           

创建两个实现类

public class DogHelloService implements HelloService{
    @Override
    public String sayHello() {
        return "wang wang";
    }

    @Override
    public String sayHello(URL url) {
        return "wang url";
    }
}
           
public class HumanHelloService implements HelloService{
    @Override
    public String sayHello() {
        return "hello 你好";
    }

    @Override
    public String sayHello(URL url) {
        return  "hello url";
    }
}
           

src/main/resources/META-INF/dubbo/com.test.service.HelloService中内容为

human=com.test.service.impl.HumanHelloService
dog=com.test.service.impl.DogHelloService
           

实际使用时根据url动态使用实现类

public class DubboAdaptiveMain {
    public static void main(String[] args) {
    	//?之前的不重要,主要是?后面的hello.service=dog 
    	//hello.service--》HelloService
        URL   url  = URL.valueOf("test://localhost/hello?hello.service=dog");
        HelloService  adaptiveExtension = ExtensionLoader.getExtensionLoader(HelloService.class).getAdaptiveExtension();
        String  msg = adaptiveExtension.sayHello(url);
        System.out.println(msg);
    }
}
           

利用dubbo的spi机制来自定义拦截器

实现 org.apache.dubbo.rpc.Filter 接口

@Activate(group = {CommonConstants.CONSUMER,CommonConstants.PROVIDER})
public class DubboInvokeFilter   implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        long   startTime  = System.currentTimeMillis();
        try {
            // 执行方法
            return  invoker.invoke(invocation);
        } finally {
            System.out.println("invoke time:"+(System.currentTimeMillis()-startTime) + "毫秒");
        }

    }
}
           

src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter内容

timeFilter=com.test.filter.DubboInvokeFilter
           

负载均衡策略

参考官网

Random LoadBalance

随机,按权重设置随机概率。

在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

RoundRobin LoadBalance

轮询,按公约后的权重设置轮询比率。

存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

LeastActive LoadBalance

最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。

使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

ConsistentHash LoadBalance

一致性 Hash,相同参数的请求总是发到同一提供者。

当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

算法参见:http://en.wikipedia.org/wiki/Consistent_hashing

缺省只对第一个参数 Hash,如果要修改,请配置 <dubbo:parameter key=“hash.arguments” value=“0,1” />

缺省用 160 份虚拟节点,如果要修改,请配置 <dubbo:parameter key=“hash.nodes” value=“320” />

配置负载均衡

//服务端服务级别
<dubbo:service interface="..." loadbalance="roundrobin" />
//客户端服务级别
<dubbo:reference interface="..." loadbalance="roundrobin" />
//服务端方法级别
<dubbo:service interface="...">
    <dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:service>
//客户端方法级别
<dubbo:reference interface="...">
    <dubbo:method name="..." loadbalance="roundrobin"/>
</dubbo:reference>

//在服务提供者一方配置负载均衡
@Service(loadbalance = "random")
public class HelloServiceImpl implements HelloService {
    public String sayHello(String name) {
        return "hello " + name;
	} 
}
//在服务消费者一方配置负载均衡策略
@Reference(check = false,loadbalance = "random")
           

自定义负载均衡

实现org.apache.dubbo.rpc.cluster.LoadBalance接口

public class OnlyFirstLoadbalancer implements LoadBalance {
    @Override
    public <T> Invoker<T> select(List<Invoker<T>> list, URL url, Invocation invocation) throws RpcException {
        // 所有的服务提供者 按照IP  + 端口排序   选择第一个
        return  list.stream().sorted((i1,i2)->{
            final   int  ipCompare = i1.getUrl().getIp().compareTo(i2.getUrl().getIp());
            if(ipCompare == 0){
                 return   Integer.compare(i1.getUrl().getPort(),i2.getUrl().getPort());
            }
            return  ipCompare;
        }).findFirst().get();

    }
}
           

在dubbo-spi-loadbalance工程的 META-INF/dubbo 目录下新建

org.apache.dubbo.rpc.cluster.LoadBalance 文件,并将当前类的全名写入

onlyFirst=com.laogu.loadbalance.OnlyFirstLoadbalancer
           

使用配置

@Reference(loadbalance = "onlyFirst")
    private HelloService helloService;
           

异步调用

通过xml方式引入

<dubbo:reference id="helloService" interface="com.test.service.HelloService">
	 <dubbo:method name="sayHello" async="true" />
</dubbo:reference>
           

获得异步方法的返回结果

异步调用特殊说明

需要特别说明的是,该方式的使用,请确保dubbo的版本在2.5.4及以后的版本使用。 原因在于在2.5.3 及之前的版本使用的时候,会出现异步状态传递问题。

比如我们的服务调用关系是 A -> B -> C , 这时候如果A向B发起了异步请求,在错误的版本时,B向C发 起的请求也会连带的产生异步请求。这是因为在底层实现层面,他是通过 RPCContext 中的

attachment 实现的。在A向B发起异步请求时,会在 attachment 中增加一个异步标示字段来表明异步 等待结果。B在接受到A中的请求时,会通过该字段来判断是否是异步处理。但是由于值传递问题,B向 C发起时同样会将该值进行传递,导致C误以为需要异步结果,导致返回空。这个问题在2.5.4及以后的 版本进行了修正。

dubbo线程池

dubbo在使用时,都是通过创建真实的业务线程池进行操作的。目前已知的线程池模型有两个和java中 的相互对应:

fix: 表示创建固定大小的线程池。也是Dubbo默认的使用方式,默认创建的执行线程数为200,并且是没有任何等待队列的。所以再极端的情况下可能会存在问题,比如某个操作大量执行时,可能存在堵塞的情况。

public class FixedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
           

cache: 创建非固定大小的线程池,当线程不足时,会自动创建新的线程。但是使用这种的时候需 要注意,如果突然有高TPS的请求过来,方法没有及时完成,则会造成大量的线程创建,对系统的 CPU和负载都是压力,执行越多反而会拖慢整个系统。

public class CachedThreadPool implements ThreadPool {
    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
           

自定义线程池

自定义一个监控线程数量报警的线程池

public class WachingThreadPool  extends FixedThreadPool  implements  Runnable{
    private  static  final Logger  LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);
    // 定义线程池使用的阀值
    private  static  final  double  ALARM_PERCENT = 0.90;
    private  final Map<URL, ThreadPoolExecutor>    THREAD_POOLS = new ConcurrentHashMap<>();
    public  WachingThreadPool(){
        // 每隔3秒打印线程使用情况
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this,1,3, TimeUnit.SECONDS);
    }
    // 通过父类创建线程池
    @Override
    public Executor getExecutor(URL url) {
         final  Executor executor = super.getExecutor(url);
         if(executor instanceof  ThreadPoolExecutor){
             THREAD_POOLS.put(url,(ThreadPoolExecutor)executor);
         }
         return  executor;
    }

    @Override
    public void run() {
         // 遍历线程池
         for (Map.Entry<URL,ThreadPoolExecutor> entry: THREAD_POOLS.entrySet()){
              final   URL  url = entry.getKey();
              final   ThreadPoolExecutor  executor = entry.getValue();
              // 计算相关指标
              final  int  activeCount  = executor.getActiveCount();
              final  int  poolSize = executor.getCorePoolSize();
              double  usedPercent = activeCount / (poolSize*1.0);
              LOGGER.info("线程池执行状态:[{}/{}:{}%]",activeCount,poolSize,usedPercent*100);
              if (usedPercent > ALARM_PERCENT){
                  LOGGER.error("超出警戒线! host:{} 当前使用率是:{},URL:{}",url.getIp(),usedPercent*100,url);
              }

         }
    }
}
           

src/main/resources/META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool的内容为

watching=com.test.threadpool.WachingThreadPool
           

在服务提供方项目中设置使用该线程池生成器

dubbo的基础知识

路由规则

路由是决定一次请求中需要发往目标机器的重要判断,通过对其控制可以决定请求的目标机器。我们可 以通过创建这样的规则来决定一个请求会交给哪些服务器去处理。

快速入门

//执行了这个之后消费端正常调用提供端,就会按照路由规则执行
public class DubboRouterMain {
    public static void main(String[] args) {
		RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension() ;
		Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://127.0.0.1:2181"));
		//condition表示这是一个条件路由
		registry.register(URL.valueOf("condition://0.0.0.0/com.test.service.HelloService?category=routers&force=true&dynamic=true&rule=" + URL.encode("=> host != 192.168.XX.XX")));
	} 
}
           

参考官网

路由规则

向注册中心写入路由规则的操作通常由监控中心或治理中心的页面完成

RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("route://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule=" + URL.encode("host = 10.20.153.10 => host = 10.20.153.11")));
           

route:// 表示路由规则的类型,支持条件路由规则和脚本路由规则,可扩展,必填。

0.0.0.0 表示对所有 IP 地址生效,如果只想对某个 IP 的生效,请填入具体 IP,必填。

com.foo.BarService 表示只对指定服务生效,必填。

group=foo 对指定服务的指定group生效,不填表示对未配置group的指定服务生效

version=1.0对指定服务的指定version生效,不填表示对未配置version的指定服务生效

category=routers 表示该数据为动态配置类型,必填。

dynamic=false 表示该数据为持久数据,当注册方退出时,数据依然保存在注册中心,必填。

enabled=true 覆盖规则是否生效,可不填,缺省生效。

force=false 当路由结果为空时,是否强制执行,如果不强制执行,路由结果为空的路由规则将自动失效,可不填,缺省为 false。

runtime=false 是否在每次调用时执行路由规则,否则只在提供者地址列表变更时预先执行并缓存结果,调用时直接从缓存中获取路由结果。如果用了参数路由,必须设为 true,需要注意设置会影响调用的性能,可不填,缺省为 false。

priority=1 路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,缺省为 0。

rule=URL.encode(“host = 10.20.153.10 => host = 10.20.153.11”) 表示路由规则的内容,必填。

条件路由规则

基于条件表达式的路由规则,如:host = 10.20.153.10 => host = 10.20.153.11

规则:

=> 之前的为消费者匹配条件,所有参数和消费者的 URL 进行对比,当消费者满足匹配条件时,对该消费者执行后面的过滤规则。

=> 之后为提供者地址列表的过滤条件,所有参数和提供者的 URL 进行对比,消费者最终只拿到过滤后的地址列表。

如果匹配条件为空,表示对所有消费方应用,如:=> host != 10.20.153.11

如果过滤条件为空,表示禁止访问,如:host = 10.20.153.10 =>