前言
上篇文章
《Spring Cloud中Hystrix 線程隔離導緻ThreadLocal資料丢失》我們對ThreadLocal資料丢失進行了詳細的分析,并通過代碼的方式複現了這個問題。
在上篇文章的末尾我也說了思路給大家提供了,如果需要能夠在Hystrix 為線程隔離模式也能正确傳遞資料的話,需要我們自己去修改。
我這邊以Zuul中自定義負載均衡政策來進行講解,在Zuul中需要實作灰階釋出的功能,需要在Filter中将請求的使用者資訊傳遞到自定的負載政策中,Zuul中整合了Hystrix,從Zuul Filter的請求到Ribbon的政策類中,線程已經發生了變化,變成了Hystrix提供的線程池來執行(配置隔離模式為線程)。這個時用ThreadLocal就會出問題了,資料傳輸會錯亂。也就是我們前面分析的問題。
關于修改我說下自己分析問題的一些思路,ransmittable-thread-local可以解決這個問題,可以對線程或者線程池進行修飾,其實最終的原理就是對線程進行包裝,線上程run之前和之後做一些處理來保證資料的正确傳遞。
https://blog.didispace.com/Spring-Cloud%E4%B8%ADHystrix-%E7%BA%BF%E7%A8%8B%E9%9A%94%E7%A6%BB%E5%AF%BC%E8%87%B4ThreadLocal%E6%95%B0%E6%8D%AE%E4%B8%A2%E5%A4%B1%EF%BC%88%E7%BB%AD%EF%BC%89/#%E6%94%B9%E9%80%A0%E6%80%9D%E8%B7%AF 改造思路
首先我想的就是改掉Hystrix中的線程池或者線程,隻有這樣才能讓ransmittable-thread-local來接管線程中資料的傳遞。
通過調試的方式找到com.netflix.hystrix.HystrixThreadPool是Hystrix線程池的接口,裡面定義了一個擷取ExecutorService方法,代碼如下:
public interface HystrixThreadPool {
/**
* Implementation of {@link ThreadPoolExecutor}.
*
* @return ThreadPoolExecutor
*/
public ExecutorService getExecutor();
}
通過查找接口的實作類,發現隻有一個預設的實作com.netflix.hystrix.HystrixThreadPool.HystrixThreadPoolDefault,實作也在接口中,是一個靜态類。實作的方法如下:
@Override
public ThreadPoolExecutor getExecutor() {
touchConfig();
return threadPool;
}
threadPool是類中的一個變量,主要是通過touchConfig方法來設定線程的參數,touchConfig代碼如下:
private void touchConfig() {
final int dynamicCoreSize = properties.coreSize().get();
final int configuredMaximumSize = properties.maximumSize().get();
int dynamicMaximumSize = properties.actualMaximumSize();
final boolean allowSizesToDiverge = properties.getAllowMaximumSizeToDivergeFromCoreSize().get();
boolean maxTooLow = false;
if (allowSizesToDiverge && configuredMaximumSize < dynamicCoreSize) {
//if user sets maximum < core (or defaults get us there), we need to maintain invariant of core <= maximum
dynamicMaximumSize = dynamicCoreSize;
maxTooLow = true;
}
//......
}
這是最外層擷取線程池的地方,可以根據代碼一步步進去看,最終擷取線程池的代碼在com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy.getThreadPool方法中。
上面是線程池的源碼分析,我們可以改造源碼,将線程池用ransmittable-thread-local進行修飾。
https://blog.didispace.com/Spring-Cloud%E4%B8%ADHystrix-%E7%BA%BF%E7%A8%8B%E9%9A%94%E7%A6%BB%E5%AF%BC%E8%87%B4ThreadLocal%E6%95%B0%E6%8D%AE%E4%B8%A2%E5%A4%B1%EF%BC%88%E7%BB%AD%EF%BC%89/#%E6%94%B9%E9%80%A0%E7%BA%BF%E7%A8%8B%E6%96%B9%E5%BC%8F 改造線程方式
另外一種是改造線程的方式,在Hystrix将指令丢入線程池的時候對線程進行修飾也可以解決此問題,因為ransmittable-thread-local對線程池進行修飾,其原理也是改造了線程,通過源碼可以看出:
public static ExecutorService getTtlExecutorService(ExecutorService executorService) {
if (executorService == null || executorService instanceof ExecutorServiceTtlWrapper) {
return executorService;
}
return new ExecutorServiceTtlWrapper(executorService);
}
class ExecutorServiceTtlWrapper extends ExecutorTtlWrapper implements ExecutorService {
private final ExecutorService executorService;
ExecutorServiceTtlWrapper(ExecutorService executorService) {
super(executorService);
this.executorService = executorService;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return executorService.submit(TtlCallable.get(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return executorService.submit(TtlRunnable.get(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return executorService.submit(TtlRunnable.get(task));
}
// ...........
}
重點在TtlRunnable.get()
改造Hystrix中線程的方式,可以通過HystrixContextScheduler進行入手,Hystrix通過HystrixContextScheduler的ThreadPoolScheduler把指令submit到ThreadPoolExecutor中去執行。
通過上面的分析,最終可以定位到送出指令的代碼如下:
private static class ThreadPoolWorker extends Worker {
private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final Func0<Boolean> shouldInterruptThread;
public ThreadPoolWorker(HystrixThreadPool threadPool, Func0<Boolean> shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}
@Override
public void unsubscribe() {
subscription.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return subscription.isUnsubscribed();
}
@Override
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
FutureTask<?> f = (FutureTask<?>) executor.submit(sa);
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
throw new IllegalStateException("Hystrix does not support delayed scheduling");
}
}
核心代碼在schedule方法中,隻需要将schedule中的sa進行修飾即可。
改造後的代碼如下:
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);
subscription.add(sa);
sa.addParent(subscription);
ThreadPoolExecutor executor = (ThreadPoolExecutor) threadPool.getExecutor();
FutureTask<?> f = (FutureTask<?>) executor.submit(TtlRunnable.get(sa));
sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread, executor));
return sa;
}
改源碼還涉及到重新打包等問題,每個項目都得用修改後的jar包,比較麻煩,最簡單的做法就是在項目中建一個同樣的HystrixContextScheduler類,包名也要和之前一樣,讓jvm優先加載,這樣就能用這個修改的類來代替Hystrix原始的類。
最後我們來驗證下這樣的改動是否正确,首先我們在Zuul的Filter中進行值的傳遞:
RibbonFilterContextHolder是基于InheritableThreadLocal做的值傳遞,代碼如下:
public class RibbonFilterContextHolder {
private static final ThreadLocal<RibbonFilterContext> contextHolder = new InheritableThreadLocal<RibbonFilterContext>() {
@Override
protected RibbonFilterContext initialValue() {
return new DefaultRibbonFilterContext();
}
};
public static RibbonFilterContext getCurrentContext() {
return contextHolder.get();
}
public static void clearCurrentContext() {
contextHolder.remove();
}
}
完整源碼請參考:
https://github.com/yinjihuan/spring-cloud/blob/master/fangjia-common/src/main/java/com/fangjia/common/support/RibbonFilterContextHolder.javaprivate static AtomicInteger ac = new AtomicInteger();
@Override
public Object run() {
RequestContext ctx = RequestContext.getCurrentContext();
RibbonFilterContextHolder.getCurrentContext().add("servers",ac.addAndGet(1)+"");
return null;
}
通過AtomicInteger 進行數字的累加操作,後面測試的時候用10個線程并發測試,如如果在Ribbon的自定義負載政策中接收的值是0-9的話表示正确,否則錯誤。
接下來定義一個負載政策類,輸出接收的值:
public class GrayPushRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public GrayPushRule() {
this.nextServerCyclicCounter = new AtomicInteger(0);
}
public GrayPushRule(ILoadBalancer lb) {
this();
this.setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
String servers = RibbonFilterContextHolder.getCurrentContext().get("servers");
System.out.println(Thread.currentThread().getName()+":"+servers);
return null;
}
public Server choose(Object key) {
return this.choose(this.getLoadBalancer(), key);
}
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
然後增加配置,使用自定義的政策,還需要将Hystrix的線程池數量改小一點,這樣才可以線程複用
fsh-house.ribbon.NFLoadBalancerRuleClassName=com.fangjia.fsh.api.rule.GrayPushRule
# 線程隔離模式
zuul.ribbon-isolation-strategy=thread
hystrix.threadpool.default.coreSize=3
啟動服務,用ab進行測試:
ab -n 10 -c 10 http://192.168.10.170:2103/fsh-house/house/1
輸出結果如下:
hystrix-RibbonCommand-3:10
hystrix-RibbonCommand-2:3
hystrix-RibbonCommand-1:8
hystrix-RibbonCommand-3:10
hystrix-RibbonCommand-2:3
hystrix-RibbonCommand-1:8
hystrix-RibbonCommand-3:10
hystrix-RibbonCommand-2:3
hystrix-RibbonCommand-1:8
hystrix-RibbonCommand-3:10
很多資料都重複了,這就是線程複用導緻的問題,接下來我們用上面講的方式進行改造
需要将RibbonFilterContextHolder中的InheritableThreadLocal改成TransmittableThreadLocal
private static final TransmittableThreadLocal<RibbonFilterContext> contextHolder = new TransmittableThreadLocal<RibbonFilterContext>() {
@Override
protected RibbonFilterContext initialValue() {
return new DefaultRibbonFilterContext();
}
};
然後在項目中建立一個HystrixContextScheduler類,包名必須是com.netflix.hystrix.strategy.concurrency,代碼就按上面貼的進行改,主要是對線程進行修飾:
FutureTask<?> f = (FutureTask<?>) executor.submit(TtlRunnable.get(sa));
再次啟動服務,進行測試,結果如下:
hystrix-RibbonCommand-2:10
hystrix-RibbonCommand-1:1
hystrix-RibbonCommand-3:7
hystrix-RibbonCommand-3:8
hystrix-RibbonCommand-1:2
hystrix-RibbonCommand-2:4
hystrix-RibbonCommand-3:5
hystrix-RibbonCommand-1:9
hystrix-RibbonCommand-2:3
hystrix-RibbonCommand-3:6
現在的結果已經是正确的
https://blog.didispace.com/Spring-Cloud%E4%B8%ADHystrix-%E7%BA%BF%E7%A8%8B%E9%9A%94%E7%A6%BB%E5%AF%BC%E8%87%B4ThreadLocal%E6%95%B0%E6%8D%AE%E4%B8%A2%E5%A4%B1%EF%BC%88%E7%BB%AD%EF%BC%89/#%E6%94%B9%E9%80%A0%E7%BA%BF%E7%A8%8B%E6%B1%A0%E6%96%B9%E5%BC%8F 改造線程池方式
上面介紹了改造線程的方式,并且通過建一個同樣的Java類來覆寫Jar包中的實作,感覺有點投機取巧,其實不用這麼麻煩,Hystrix預設提供了HystrixPlugins類,可以讓使用者自定義線程池,下面來看看怎麼使用:
在啟動之前調用進行注冊自定義實作的邏輯:
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());
ThreadLocalHystrixConcurrencyStrategy就是我們自定義的建立線程池的類,需要繼承HystrixConcurrencyStrategy,前面也有講到通過調試代碼發現最終擷取線程池的代碼就在HystrixConcurrencyStrategy中。
我們隻需要重寫getThreadPool方法即可完成對線程池的改造,由于TtlExecutors隻能修飾ExecutorService和Executor,而HystrixConcurrencyStrategy中傳回的是ThreadPoolExecutor,我們需要對ThreadPoolExecutor進行包裝一層,最終在execute方法中對線程修飾,也就相當于改造了線程池。
public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.doGetThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.doGetThreadPool(threadPoolKey, threadPoolProperties);
}
}
在doGetThreadPool方法中就傳回包裝的線程池,代碼如下:
return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue,
threadFactory);
最後就是ThreadLocalThreadPoolExecutor的代碼:
public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
}
@Override
public void execute(Runnable command) {
super.execute(TtlRunnable.get(command));
}
}