天天看点

有状态InheritableThreadLocal 配合 JDK8 ,异步方法调用

我们可以把一个类的作用域注解为

@Scope(scopeName = WebApplicationContext.SCOPE_SESSION, proxyMode = ScopedProxyMode.TARGET_CLASS)      

这样这个类就能在session中获取,可以把用户信息放到这个类中,需要的时候,直接@Autowire进来就好了.

但是这样有一个坑.在主线程中,如果使用JDK异步方法,或者自己new出新的线程中,没有办法注入.会提示一个异常

Scope 'session' is not active for the current thread; consider defining a scoped proxy for this bean if you intend to refer to it from a singleton;      

这个异常非常误导人,在单线程下是不会提示的.提示了这个异常,原因是:session 是根据request获取的,那么Spring是用requestContextHolder实现的,原理还是InheritableThreadLocal ,按道理来说,在子线程也是可以获取到父线程的request.getSession,至于为什么在子线程中没有激活这个session作用域,导致注入失败,估计是Spring的设计问题.

所以参考RequestContextHolder,自己可以写一个记录结果以及错误信息的LocalResut

 虽然我用了没有问题,但是所有书籍记录中,都不推荐在并发流或者多线程中使用有状态对象,虽然我这不涉及竞态条件,但是如果因为线程回收慢导致ThreadLocal数据没有回写到主线程就已经返回了,或者可见性问题,那么结果也会有误,而且不好debug

这里用一个ThreadLocal 记录结果集,或者错误信息,等调用完并发流以后,可以join之后,输出结果,注意线程可见性问题,这加上volatile 修饰.

public class LocalResult{
    private static volatile ThreadLocal<Map<String, String>> mapMessage = new InheritableThreadLocal();

    public LocalError() {
    }


    public static void mapMessageCreate() {
        mapMessage.remove();
        mapMessage.set(new HashMap());
    }

    public static ThreadLocal<Map<String, String>> getMapMessage() {
        return mapMessage;
    }
}      

在这里根据订单的id数组,进行批量的异步结算,在结算方法中,如果有错误信息,依然会调用LocalResult,

线程池等于需要并发结算的数量+3 是一个容错,避免线程池的Thead不够,导致ThreadLocal数据乱套

for开启异步方法,之后马上关闭线程池

然后还要等待线程池所有线程已经回收以后才让方法返回:是因为线程执行完方法,不一定立即回收,当线程已经开启,马上调用线程池的shutdown方法,告诉线程执行完后,尽快回收.

然后循环判断线程池是否完全回收后,才返回因为,根据join()规则。假设线程A在执行的过程中,通过执行ThreadB.join()来等待线

程B终止;同时,假设线程B在终止之前修改了一些共享变量,线程A从ThreadB.join()返回后会

读这些共享变量。

.

public void doAsync(Integer [] ids) {
        List<CompletableFuture<Boolean>> futures = new ArrayList<>(ids.length);
        ExecutorService executor = Executors.newFixedThreadPool(ids.length+3);
        for (Integer id : ids) {
            Order order = orderService.findEntityById(id);
            if (Order.Type.Failure.getCode().equals(orderInfo.getType())) {
                LocalResult.getMapMessage().get().put(order.getOrderNo(), "单号:" + order.getOrderNo() + ",已作废,不能结算!");
                continue;
            }
            futures.add(CompletableFuture.supplyAsync(() -> this.do(orderInfo),executor));
        }
        executor.shutdown();
        futures.stream().map(CompletableFuture::join);
        while (!executor.isTerminated()) {
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                break;
            }
        }
    }      

 controller

@PostMapping(path = "/do",produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public ResultDataDto billing(@RequestParam("ids") Integer[] ids) {
        LocalResult.mapMessageCreate();
        service.doAsync(ids);
        if (LocalResult.getMapMessage().get().size() > 0) {
            StringBuilder stringBuilder = new StringBuilder();
            for (Map.Entry<String, String> entry : LocalResult.getMapMessage().get().entrySet()) {
                stringBuilder.append("单号:" + entry.getKey() + ":" + entry.getValue() + "</br>");
            }
            return ResultDataDto.addOperationSuccess("以下失败!:</br>" + stringBuilder.toString());
        }
        return ResultDataDto.addOperationSuccess("结算成功!");
    }      

继续阅读