博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo源码分析-集群容错(二)
阅读量:6352 次
发布时间:2019-06-22

本文共 8988 字,大约阅读时间需要 29 分钟。

hot3.png

FailbackClusterInvoker

FailbackClusterInvoke是失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。

public class FailbackClusterInvoker
extends AbstractClusterInvoker
{ private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); private static final long RETRY_FAILED_PERIOD = 5000L; private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true)); private final ConcurrentMap
> failed = new ConcurrentHashMap(); private volatile ScheduledFuture
retryFuture; public FailbackClusterInvoker(Directory
directory) { super(directory); } private void addFailed(Invocation invocation, AbstractClusterInvoker
router) { if (this.retryFuture == null) { //使用对象锁 初始化定时任务 synchronized(this) { if (this.retryFuture == null) { this.retryFuture = this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { public void run() { try { //调用当前实例的重试方法 重试后移除 FailbackClusterInvoker.this.retryFailed(); } catch (Throwable var2) { FailbackClusterInvoker.logger.error("Unexpected error occur at collect statistic", var2); } } }, 5000L, 5000L, TimeUnit.MILLISECONDS); } } } this.failed.put(invocation, router); } void retryFailed() { if (this.failed.size() != 0) { Iterator i$ = (new HashMap(this.failed)).entrySet().iterator(); while(i$.hasNext()) { Entry
> entry = (Entry)i$.next(); Invocation invocation = (Invocation)entry.getKey(); Invoker invoker = (Invoker)entry.getValue(); try { invoker.invoke(invocation); this.failed.remove(invocation); } catch (Throwable var6) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", var6); } } } } protected Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { try { this.checkInvokers(invokers, invocation); Invoker
invoker = this.select(loadbalance, invocation, invokers, (List)null); return invoker.invoke(invocation); } catch (Throwable var5) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + var5.getMessage() + ", ", var5); //如果调用失败 放入失败列表 this.addFailed(invocation, this); return new RpcResult(); } }}

FailsafeClusterInvoker

FailsafeClusterInvoker实现了调用失败 直接返回空的对象

public class FailsafeClusterInvoker
extends AbstractClusterInvoker
{ private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory
directory) { super(directory); } public Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { try { this.checkInvokers(invokers, invocation); Invoker
invoker = this.select(loadbalance, invocation, invokers, (List)null); return invoker.invoke(invocation); } catch (Throwable var5) { logger.error("Failsafe ignore exception: " + var5.getMessage(), var5); return new RpcResult(); } }}

FailfastClusterInvoker

FailfastClusterInvoker实现了只调用一次 失败后抛异常

public class FailfastClusterInvoker
extends AbstractClusterInvoker
{ public FailfastClusterInvoker(Directory
directory) { super(directory); } public Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null); try { return invoker.invoke(invocation); } catch (Throwable var6) { if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) { throw (RpcException)var6; } else { throw new RpcException(var6 instanceof RpcException ? ((RpcException)var6).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + this.getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + var6.getMessage(), var6.getCause() != null ? var6.getCause() : var6); } } }}

ForkingClusterInvoker

ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。

public class ForkingClusterInvoker
extends AbstractClusterInvoker
{ private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); public ForkingClusterInvoker(Directory
directory) { super(directory); } public Result doInvoke(final Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); int forks = this.getUrl().getParameter("forks", 2); int timeout = this.getUrl().getParameter("timeout", 1000); final Object selected; if (forks > 0 && forks < invokers.size()) { selected = new ArrayList(); for(int i = 0; i < forks; ++i) { Invoker
invoker = this.select(loadbalance, invocation, invokers, (List)selected); if (!((List)selected).contains(invoker)) { ((List)selected).add(invoker); } } } else { selected = invokers; } RpcContext.getContext().setInvokers((List)selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue
ref = new LinkedBlockingQueue(); Iterator i$ = ((List)selected).iterator(); while(i$.hasNext()) { final Invoker
invoker = (Invoker)i$.next(); //同时执行多个调用任务 this.executor.execute(new Runnable() { public void run() { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable var3) { int value = count.incrementAndGet(); if (value >= ((List)selected).size()) { ref.offer(var3); } } } }); } try { //指定时间后 获取队列返回的结果 无论是否有异常 有一个结果即返回 Object ret = ref.poll((long)timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable)ret; throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } else { return (Result)ret; } } catch (InterruptedException var11) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + var11.getMessage(), var11); } }}

BroadcastClusterInvoker

BroadcastClusterInvoker会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。该类通常用于通知所有提供者更新缓存或日志等本地资源信息。

public class BroadcastClusterInvoker
extends AbstractClusterInvoker
{ private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker(Directory
directory) { super(directory); } public Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { this.checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers(invokers); RpcException exception = null; Result result = null; Iterator i$ = invokers.iterator(); while(i$.hasNext()) { Invoker invoker = (Invoker)i$.next(); try { result = invoker.invoke(invocation); } catch (RpcException var9) { exception = var9; logger.warn(var9.getMessage(), var9); } catch (Throwable var10) { exception = new RpcException(var10.getMessage(), var10); logger.warn(var10.getMessage(), var10); } } if (exception != null) { throw exception; } else { return result; } }}

转载于:https://my.oschina.net/odetteisgorgeous/blog/3019477

你可能感兴趣的文章
继 One Step 后,锤子科技 Big Bang 正式开源
查看>>
《数据科学:R语言实现》——2.5 使用Excel文件
查看>>
《淘宝店铺设计装修一册通》一2.5 抠图工具的简单运用
查看>>
《音乐达人秀:Adobe Audition实战200例》——实例4 收音机音乐节目转录到电脑里...
查看>>
《JavaScript应用程序设计》一一3.1 过时的类继承
查看>>
千万PV是什么意思?
查看>>
Amazon 推出 API 网关使用计划
查看>>
互联网流量超出路由器上限 或致全球断网
查看>>
《基于ArcGIS的Python编程秘笈(第2版)》——2.5 限制图层列表
查看>>
GNOME 地图 3.20 加入更多新特性 可用性得到加强
查看>>
《代码整洁之道:程序员的职业素养》导读
查看>>
《计算复杂性:现代方法》——习题
查看>>
Mozilla 释出更新修复中间人攻击漏洞
查看>>
思科表态反对网络中立
查看>>
《HTML5+CSS3网页设计入门必读》——1.5 利用多种Web浏览器执行测试
查看>>
Velocity官方指南-容器
查看>>
国家为何如此重视石墨烯?
查看>>
《Python和Pygame游戏开发指南》——1.14 配套网站上的更多信息
查看>>
Kafka+Flink 实现准实时异常检测系统
查看>>
利用mybatis查询两级树形菜单
查看>>