hytrix半成品
2021-10-31 21:25:39 0 举报
登录查看完整内容
为你推荐
查看更多
hytrix
作者其他创作
大纲/内容
HystrixCommand#queue()
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
.................... Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); .....................afterCache = hystrixObservable;return afterCache .doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook);
return Observable.defer(new Func0<Observable<R>>()
call()
applyHystrixSemantics(final AbstractCommand _cmd)
return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext);
@Override public Worker createWorker() { return new HystrixContextSchedulerWorker(actualScheduler.createWorker()); }
Observable<R> execution;if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd));} else { execution = executeCommandWithSpecifiedIsolation(_cmd);}return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext);
Observable<R> userObservable;try { userObservable = getExecutionObservable();} catch (Throwable ex) { // the run() method is a user provided implementation so can throw instead of using Observable.onError // so we catch it here and turn it into Observable.error userObservable = Observable.error(ex);}return userObservable .lift(new ExecutionHookApplication(_cmd)) .lift(new DeprecatedOnRunHookApplication(_cmd));
execute()
final AbstractCommand<R> _cmd
return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease);
final Action0 unsubscribeCommandCleanup
Observable.defer()
toObservable().get()
HystrixContextScheduler$HystrixContextSchedulerWorker
AbstractCommand#toObservable()
HystrixContextScheduler
@Overridepublic Subscription schedule(Action0 action) { if (threadPool != null) { if (!threadPool.isQueueSpaceAvailable()) { throw new RejectedExecutionException(\"Rejected command because thread-pool queueSize is at rejection threshold.\
HystrixContextScheduler$ThreadPoolScheduler
断路器允许访问
HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) {......}
queue()
return handleShortCircuitViaFallback();
final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);final Action0 singleSemaphoreReleasefinal Action1<Throwable> markExceptionThrownif (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); }} else { return handleSemaphoreRejectionViaFallback();}
FutureTask异步Observable-》call()
final Action0 terminateCommandCleanup
return applyHystrixSemantics(_cmd);
断路器不允许访问
HystrixContextScheduler#schedule(final Action0 action)
return hystrixCommand.execute();
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never();}
executionHook.onThreadStart(_cmd);executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd);
call()
final Func0<Observable<R>> applyHystrixSemantics
final Action0 fireOnCompletedHook
0 条评论
回复 删除
下一页