之前看了 dubbo 的一些代码,在学习过程中,主要关注那些比较“高级”的内容,SPI,自适应扩展等,却忘了一些作为一个 rpc 框架最核心需要的部分,比如如何通信,序列化,网络,容错机制等等,因为其实这个最核心的就是远程调用,自适应扩展其实就是让代码可扩展性,可读性,更优雅等,写的搓一点其实也问题不大,但是一个合适的通信协议,序列化方法,如何容错等却是真正保证是一个 rpc 框架最重要的要素。 首先来看这张图 在集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。 各节点关系:
这里的 Invoker
是 Provider
的一个可调用 Service
的抽象,Invoker
封装了 Provider
地址及 Service
接口信息 Directory
代表多个 Invoker
,可以把它看成 List<Invoker>
,但与 List
不同的是,它的值可能是动态变化的,比如注册中心推送变更Cluster
将 Directory
中的多个 Invoker
伪装成一个 Invoker
,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个Router
负责从多个 Invoker
中按路由规则选出子集,比如读写分离,应用隔离等LoadBalance
负责从多个 Invoker
中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选集群容错模式 Failover Cluster 失败自动切换,当出现失败,重试其它服务器 1。通常用于读操作,但重试会带来更长延迟。可通过 retries=”2” 来设置重试次数(不含第一次)。
重试次数配置如下:
<dubbo:service retries=”2” /> 这里重点看下 Failover Cluster
集群模式的实现1 2 3 4 5 6 7 8 9 public class FailoverCluster implements Cluster { public final static String NAME = "failover" ; public <T> Invoker<T> join (Directory<T> directory) throws RpcException { return new FailoverClusterInvoker <T>(directory); } }
这个代码就非常简单,重点需要看FailoverClusterInvoker
里的代码,FailoverClusterInvoker
继承了AbstractClusterInvoker
类,其中invoke 方法是在抽象类里实现的1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Override public Result invoke (final Invocation invocation) throws RpcException { checkWhetherDestroyed(); Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments(); if (contextAttachments != null && contextAttachments.size() != 0 ) { ((RpcInvocation) invocation).addObjectAttachments(contextAttachments); } List<Invoker<T>> invokers = list(invocation); LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected abstract Result doInvoke (Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException;
然后重点就是FailoverClusterInvoker
中的doInvoke
方法了,其实它里面也就这么一个方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke (Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1 ; if (len <= 0 ) { len = 1 ; } RpcException le = null ; List<Invoker<T>> invoked = new ArrayList <Invoker<T>>(copyInvokers.size()); Set<String> providers = new HashSet <String>(len); for (int i = 0 ; i < len; i++) { if (i > 0 ) { checkWhetherDestroyed(); copyInvokers = list(invocation); checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException (e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException (le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); }
Failfast Cluster 快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
Failsafe Cluster 失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
Failback Cluster 失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
Forking Cluster 并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。
Broadcast Cluster 广播调用所有提供者,逐个调用,任意一台报错则报错 2。通常用于通知所有提供者更新缓存或日志等本地资源信息。