1、概念
1)服务提供者超时是指远程调用服务的方法执行的超时时间.
2)服务调用者超时是指服务调用者调用远程方法的执行超时时间.
2、超时设置
使用dubbo进行远程调用的过程中,需要设置远程调用的超时间.超时时间分别可以在服务的提供者配置中设置,也可以在服务调用者配置中设置,超时时间的单位是毫秒.
1)全局超时配置
<dubbo:consumer timeout="5000" /> 或
<dubbo:provider timeout="5000" />
2)接口及指定方法超时配置
<dubbo:reference interhljs-string">"com.foo.BarService" timeout="2000">
<dubbo:method name="sayHello" timeout="3000" />
</dubbo:reference>
或
<dubbo:provider interhljs-string">"com.foo.BarService" timeout="2000">
<dubbo:method name="sayHello" timeout="3000" />
</dubbo:provider>
3、超时覆盖机制
1、方法级配置别优于接口级别,即小Scope优先
2、Consumer端配置 优于 Provider配置 优于 全局配置,最后是Dubbo Hard Code的配置值
dubbo的机制是如果服务的调用者配置了超时时间,会覆盖服务的提供者设置的超时时间.请注意,如果服务的调用者覆盖了服务提供者的远程方法调用超时时间,那么对于服务的提供者就会变得不可控,即服务的调用者控制了服务提供者方法执行的超时时间,这对于一次远程调用是非常不合理的,所以dubbo非常不建议在服务的调用者配置中配置服务的超时时间.
4、超时重试
dubbo在调用服务不成功时,默认是会重试两次的。这样在服务端的处理时间超过了设定的超时时间时,就会有重复请求,比如在发邮件时,可能就会发出多份重复邮件,执行注册请求时,就会插入多条重复的注册数据,那么怎么解决超时问题呢?如下
1)对于核心的服务中心,去除dubbo超时重试机制,并重新评估设置超时时间。
2)业务处理代码必须放在服务端,客户端只做参数验证和服务调用,不涉及业务流程处理
全局配置实例
<!-- 延迟到Spring初始化完成后,再暴露服务,服务调用超时设置为6秒,超时不重试-->
<dubbo:provider delay="-1" timeout="6000" retries="0"/>
5、Dubbo调用源码分析
Dubbo协议超时实现使用了Future模式,主要涉及类DubboInvoker,ResponseFuture, DefaultFuture。
在DubboInvoker中会判断是同步异步还是不需要返回值,doInvoke()方法如下
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == ) {
currentClient = clients[];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
1.如果不需要返回值,直接使用send方法,发送出去,设置当期和线程绑定RpcContext的future为null
2.如果需要异步通信,使用request方法构建一个ResponseFuture,然后设置到和线程绑定的RpcContext中
3.如果需要同步通信,使用request方法构建一个ResponseFuture,阻塞等待请求完成
ResponseFuture.get()在请求还未处理完或未到超时前一直是wait状态;响应达到后,设置请求状态,并进行notify唤醒。get()方法如下
public Object get(int timeout) throws RemotingException {
if (timeout <= ) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (! isDone()) {
long start = System.currentTimeMillis();
lock.lock();//加锁
try {
while (! isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS); //等待timeout
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (! isDone()) {// 客户端超时仍然没有得到服务端返回,抛出异常
throw new TimeoutException(sent > , channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
Client端的处理最终转化成ChannelHandler接口实现上,HeaderExchangeHandler的received()接口如下
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
//服务端处理请求
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//这里就是作为消费者的dubbo客户端在接收到响应后,触发通知对应等待线程的起点
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > ) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
public static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
received()方法又会调用DefaultFuture的received()方法,如下
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
客户端调用远程服务时,本地会生成一个DefaultFuture,调用DefaultFuture.get()获取远程服务返回的结构,此方法获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,抛出TimeOutException;如果被唤醒,则返回rpc的结果,如果超时,服务端TimeoutFilter会根据服务端timeout检测到操作超时,打出warn日志。
而Dubbo生成对象ResponseFuture时,在一个全局map里通过put(ID,Future)将该次调用的唯一ID存放起来,然后传递给服务端,再服务端又回传回来,通过该ID,DefaultFuture.FUTURES可以拿到具体的那个DefaultFuture对象,即阻塞请求线程的那个对象。调用它的doReceived方法,就可唤醒阻塞的线程,拿到返回结果
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
7、案例1:客户端超时时间>服务端超时时间
服务端配置:
<dubbo:service ref="xxxService" interface="com.xxx.XxxService" timeout="5000" />
客户端配置:
<dubbo:reference id="xxxService" interhljs-string">"com.xxx.XxxService" timeout="10000" />
客户端调用远程服务时,本地会生成一个DefaultFuture,调用DefaultFuture.get()获取远程服务返回的结构,此方法获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,抛出TimeOutException;如果被唤醒,则返回rpc的结果在服务端timeout时间为5s,如果实际的数据操作耗时7s,服务端TimeoutFilter会根据服务端timeout检测到操作超时,打出warn日志。在第7s,客户端接收到数据包,客户端timeout设置为10s>7s,DefaultFuture被唤醒,仍然可以接收到Rpc返回值。
如果超出了10s还没有返回值,抛出TimeoutException。此时这个DefaultFuture超时了,有一个线程RemotingInvocationTimeoutScan,清理所有超时的DefaultFuture,创建一个timeoutResponse,DefaultFuture.received这样的response就会抛出TimeoutException。
综上所诉:当客户端timeout值>服务端timeout值,会出现超时日志,但是仍然可以获取到结果。客户端timeout超时抛出异常时,对应超时的Future会自动清理。
引用:https://blog.csdn.net/peerless_hero/article/details/68922880
https://blog.csdn.net/qq418517226/article/details/51906357