天天看點

有狀态InheritableThreadLocal 配合 JDK8 ,異步方法調用

我們可以把一個類的作用域注解為

@Scope(scopeName = WebApplicationContext.SCOPE_SESSION, proxyMode = ScopedProxyMode.TARGET_CLASS)      

這樣這個類就能在session中擷取,可以把使用者資訊放到這個類中,需要的時候,直接@Autowire進來就好了.

但是這樣有一個坑.在主線程中,如果使用JDK異步方法,或者自己new出新的線程中,沒有辦法注入.會提示一個異常

Scope 'session' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton;      

這個異常非常誤導人,在單線程下是不會提示的.提示了這個異常,原因是:session 是根據request擷取的,那麼Spring是用requestContextHolder實作的,原理還是InheritableThreadLocal ,按道理來說,在子線程也是可以擷取到父線程的request.getSession,至于為什麼在子線程中沒有激活這個session作用域,導緻注入失敗,估計是Spring的設計問題.

是以參考RequestContextHolder,自己可以寫一個記錄結果以及錯誤資訊的LocalResut

 雖然我用了沒有問題,但是所有書籍記錄中,都不推薦在并發流或者多線程中使用有狀态對象,雖然我這不涉及競态條件,但是如果因為線程回收慢導緻ThreadLocal資料沒有回寫到主線程就已經傳回了,或者可見性問題,那麼結果也會有誤,而且不好debug

這裡用一個ThreadLocal 記錄結果集,或者錯誤資訊,等調用完并發流以後,可以join之後,輸出結果,注意線程可見性問題,這加上volatile 修飾.

public class LocalResult{
    private static volatile ThreadLocal<Map<String, String>> mapMessage = new InheritableThreadLocal();

    public LocalError() {
    }


    public static void mapMessageCreate() {
        mapMessage.remove();
        mapMessage.set(new HashMap());
    }

    public static ThreadLocal<Map<String, String>> getMapMessage() {
        return mapMessage;
    }
}      

在這裡根據訂單的id數組,進行批量的異步結算,在結算方法中,如果有錯誤資訊,依然會調用LocalResult,

線程池等于需要并發結算的數量+3 是一個容錯,避免線程池的Thead不夠,導緻ThreadLocal資料亂套

for開啟異步方法,之後馬上關閉線程池

然後還要等待線程池所有線程已經回收以後才讓方法傳回:是因為線程執行完方法,不一定立即回收,當線程已經開啟,馬上調用線程池的shutdown方法,告訴線程執行完後,盡快回收.

然後循環判斷線程池是否完全回收後,才傳回因為,根據join()規則。假設線程A在執行的過程中,通過執行ThreadB.join()來等待線

程B終止;同時,假設線程B在終止之前修改了一些共享變量,線程A從ThreadB.join()傳回後會

讀這些共享變量。

.

public void doAsync(Integer [] ids) {
        List<CompletableFuture<Boolean>> futures = new ArrayList<>(ids.length);
        ExecutorService executor = Executors.newFixedThreadPool(ids.length+3);
        for (Integer id : ids) {
            Order order = orderService.findEntityById(id);
            if (Order.Type.Failure.getCode().equals(orderInfo.getType())) {
                LocalResult.getMapMessage().get().put(order.getOrderNo(), "單号:" + order.getOrderNo() + ",已廢棄,不能結算!");
                continue;
            }
            futures.add(CompletableFuture.supplyAsync(() -> this.do(orderInfo),executor));
        }
        executor.shutdown();
        futures.stream().map(CompletableFuture::join);
        while (!executor.isTerminated()) {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                break;
            }
        }
    }      

 controller

@PostMapping(path = "/do",produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public ResultDataDto billing(@RequestParam("ids") Integer[] ids) {
        LocalResult.mapMessageCreate();
        service.doAsync(ids);
        if (LocalResult.getMapMessage().get().size() > 0) {
            StringBuilder stringBuilder = new StringBuilder();
            for (Map.Entry<String, String> entry : LocalResult.getMapMessage().get().entrySet()) {
                stringBuilder.append("單号:" + entry.getKey() + ":" + entry.getValue() + "</br>");
            }
            return ResultDataDto.addOperationSuccess("以下失敗!:</br>" + stringBuilder.toString());
        }
        return ResultDataDto.addOperationSuccess("結算成功!");
    }      

繼續閱讀