FailbackClusterInvoker
FailbackClusterInvoke是失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。
public class FailbackClusterInvokerextends AbstractClusterInvoker { private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); private static final long RETRY_FAILED_PERIOD = 5000L; private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true)); private final ConcurrentMap > failed = new ConcurrentHashMap(); private volatile ScheduledFuture retryFuture; public FailbackClusterInvoker(Directory directory) { super(directory); } private void addFailed(Invocation invocation, AbstractClusterInvoker router) { if (this.retryFuture == null) { //使用对象锁 初始化定时任务 synchronized(this) { if (this.retryFuture == null) { this.retryFuture = this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { try { //调用当前实例的重试方法 重试后移除 FailbackClusterInvoker.this.retryFailed(); } catch (Throwable var2) { FailbackClusterInvoker.logger.error("Unexpected error occur at collect statistic", var2); } } }, 5000L, 5000L, TimeUnit.MILLISECONDS); } } } this.failed.put(invocation, router); } void retryFailed() { if (this.failed.size() != 0) { Iterator i$ = (new HashMap(this.failed)).entrySet().iterator(); while(i$.hasNext()) { Entry > entry = (Entry)i$.next(); Invocation invocation = (Invocation)entry.getKey(); Invoker invoker = (Invoker)entry.getValue(); try { invoker.invoke(invocation); this.failed.remove(invocation); } catch (Throwable var6) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", var6); } } } } protected Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { try { this.checkInvokers(invokers, invocation); Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null); return invoker.invoke(invocation); } catch (Throwable var5) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + var5.getMessage() + ", ", var5); //如果调用失败 放入失败列表 this.addFailed(invocation, this); return new RpcResult(); } }}
FailsafeClusterInvoker
FailsafeClusterInvoker实现了调用失败 直接返回空的对象
public class FailsafeClusterInvokerextends AbstractClusterInvoker { private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory directory) { super(directory); } public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { try { this.checkInvokers(invokers, invocation); Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null); return invoker.invoke(invocation); } catch (Throwable var5) { logger.error("Failsafe ignore exception: " + var5.getMessage(), var5); return new RpcResult(); } }}
FailfastClusterInvoker
FailfastClusterInvoker实现了只调用一次 失败后抛异常
public class FailfastClusterInvokerextends AbstractClusterInvoker { public FailfastClusterInvoker(Directory directory) { super(directory); } public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null); try { return invoker.invoke(invocation); } catch (Throwable var6) { if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) { throw (RpcException)var6; } else { throw new RpcException(var6 instanceof RpcException ? ((RpcException)var6).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + this.getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + var6.getMessage(), var6.getCause() != null ? var6.getCause() : var6); } } }}
ForkingClusterInvoker
ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。
public class ForkingClusterInvokerextends AbstractClusterInvoker { private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); public ForkingClusterInvoker(Directory directory) { super(directory); } public Result doInvoke(final Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); int forks = this.getUrl().getParameter("forks", 2); int timeout = this.getUrl().getParameter("timeout", 1000); final Object selected; if (forks > 0 && forks < invokers.size()) { selected = new ArrayList(); for(int i = 0; i < forks; ++i) { Invoker invoker = this.select(loadbalance, invocation, invokers, (List)selected); if (!((List)selected).contains(invoker)) { ((List)selected).add(invoker); } } } else { selected = invokers; } RpcContext.getContext().setInvokers((List)selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue
BroadcastClusterInvoker
BroadcastClusterInvoker会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。
public class BroadcastClusterInvokerextends AbstractClusterInvoker { private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker(Directory directory) { super(directory); } public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers(invokers); RpcException exception = null; Result result = null; Iterator i$ = invokers.iterator(); while(i$.hasNext()) { Invoker invoker = (Invoker)i$.next(); try { result = invoker.invoke(invocation); } catch (RpcException var9) { exception = var9; logger.warn(var9.getMessage(), var9); } catch (Throwable var10) { exception = new RpcException(var10.getMessage(), var10); logger.warn(var10.getMessage(), var10); } } if (exception != null) { throw exception; } else { return result; } }}