聊聊 Dubbo 的容错机制

之前看了 dubbo 的一些代码,在学习过程中,主要关注那些比较“高级”的内容,SPI,自适应扩展等,却忘了一些作为一个 rpc 框架最核心需要的部分,比如如何通信,序列化,网络,容错机制等等,因为其实这个最核心的就是远程调用,自适应扩展其实就是让代码可扩展性,可读性,更优雅等,写的搓一点其实也问题不大,但是一个合适的通信协议,序列化方法,如何容错等却是真正保证是一个 rpc 框架最重要的要素。
首先来看这张图
cluster
在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。
各节点关系:

  • 这里的 InvokerProvider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息
  • Directory 代表多个 Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
  • ClusterDirectory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
  • Router 负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等
  • LoadBalance 负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选

集群容错模式

Failover Cluster

失败自动切换,当出现失败,重试其它服务器 1。通常用于读操作,但重试会带来更长延迟。可通过 retries=”2” 来设置重试次数(不含第一次)。

重试次数配置如下:

<dubbo:service retries=”2” />
这里重点看下 Failover Cluster集群模式的实现

1
2
3
4
5
6
7
8
9
public class FailoverCluster implements Cluster {

public final static String NAME = "failover";

public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}

}

这个代码就非常简单,重点需要看FailoverClusterInvoker里的代码,FailoverClusterInvoker继承了AbstractClusterInvoker类,其中invoke 方法是在抽象类里实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
// 绑定 attachments 到 invocation 中.
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
// 列举 Invoker
List<Invoker<T>> invokers = list(invocation);
// 加载 LoadBalance 负载均衡器
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 调用 实际的 doInvoke 进行后续操作
return doInvoke(invocation, invokers, loadbalance);
}
// 这是个抽象方法,实际是由子类实现的
protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
LoadBalance loadbalance) throws RpcException;

然后重点就是FailoverClusterInvoker中的doInvoke方法了,其实它里面也就这么一个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
// 获取重试次数,这里默认是 2 次,还有可以注意下后面的+1
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 循环调用,失败重试
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
// 在进行重试前重新列举 Invoker,这样做的好处是,如果某个服务挂了,
// 通过调用 list 可得到最新可用的 Invoker 列表
copyInvokers = list(invocation);
// check again
// 对 copyinvokers 进行判空检查
checkInvokers(copyInvokers, invocation);
}
// 通过负载均衡来选择 invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
// 将其添加到 invoker 到 invoked 列表中
invoked.add(invoker);
// 设置上下文
RpcContext.getContext().setInvokers((List) invoked);
try {
// 正式调用
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

Failfast Cluster

快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

Failsafe Cluster

失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

Failback Cluster

失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

Forking Cluster

并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。

Broadcast Cluster

广播调用所有提供者,逐个调用,任意一台报错则报错 2。通常用于通知所有提供者更新缓存或日志等本地资源信息。