天天看点

常用线程同步类CountDownLatch、CyclicBarrier用法引子CountDownLatchCyclicBarrier

引子

随着时代的发展,Object当初的抽象模型部分不适用当下的技术潮流,比如finalize()

方法在JDK9 之后直接被标记为过时方法。而wait()和notify() 同步方式事实上已经

被同步信号、锁、阻塞集合等取代。

                                                                                                           —— 《码出高效》

那么,如何用好线程同步类将是关键,以下将详细给大家介绍几个线程同步类的详细使用场景,方便大家在工作中使用。

CountDownLatch

功能讲解

常用于监听某些初始化操作,等待其他子线程初始化执行完毕,主线程再继续执行。

大体步骤为:

1、向CountDownLatch对象设置一个初始计数值

2、主线程通过CountDownLatch#await()方法进行阻塞

3、其他子线程初始化执行完毕后,调用CountDownLatch#countDown()对计数值减1,直到计数值降为0,此时表示所有资源初始化完毕。

4、主线程的CountDownLatch#await()方法不再阻塞,主线程继续往下走。

栗子1

这里通过Future#get()进行子线程是否异常的判断,如果有子线程异常,则直接往上抛出异常。Future#get()触发异常可参考blog:https://blog.csdn.net/y124675160/article/details/104399114 中关于CompleteService源码解析的部分。

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

/**
 * @author tingyu
 * @date 2020/12/12
 */

@Slf4j
public class CountDownLatchDemo {

    private static ThreadFactory threadFactory =
            new ThreadFactoryBuilder().setNameFormat("sourceInitExecutor").build();
    // 根据实际场景进行线程池大小等配置
    // 如果任务多数为排队执行,对队列要求高的,建议使用MQ的队列,将排队的压力交给云端,线程池只做执行使用
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10,
            60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory);

    public static void main(String[] args) throws Exception {

        CountDownLatch countDownLatch = new CountDownLatch(3);
        List<Future<Integer>> sourceFutureList = new ArrayList<>(10);
        sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceA")));
        sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceB")));
        sourceFutureList.add(threadPoolExecutor.submit(new SourceCallable(countDownLatch, "SourceC")));
        // 主线程在此处阻塞,直到所有子线程资源初始化完毕!
        countDownLatch.await(10, TimeUnit.SECONDS);
        sourceFutureList.forEach(future -> {
            try {
                future.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        log.info("资源准备完毕,主线程继续向下执行!");
    }

    static class SourceCallable implements Callable<Integer> {

        static Random random = new Random();

        private CountDownLatch countDownLatch;
        private String name;

        SourceCallable(CountDownLatch countDownLatch, String name) {
            this.countDownLatch = countDownLatch;
            this.name = name;
        }

        @Override
        public Integer call() {
            try {
                // 开始资源初始化
                /********** 模拟资源初始化 ***************/
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                log.info("【{}】资源初始化完毕!", name);
            } catch (Exception e) {
                // 执行一些需要清理的工作
                /********** 模拟资源清理 ***************/
                log.error("【{}】资源初始化异常!", name, e);
                throw new RuntimeException(e);
            }finally {
                countDownLatch.countDown();
            }
            return 1;
        }
    }
}
           

Copy

栗子2

上家公司的时候用CountDownLatch实现过一个功能,详情可参考:https://www.atatech.org/articles/162136

注意事项

CountDownLatch被设计为只触发一次,计数器不能被重置。如果需要能被重置计数器的版本,则可以使用CyclicBarrier。

countDown调用时要预防前面的代码抛异常,导致countDown未能执行,从而导致await一直阻塞。因此可以在异常捕获内部再调用一次,或者统一在异常捕获之后进行调用,或者在finally里面执行。

CyclicBarrier

功能讲解

适用于你希望创建一组任务,它们并行地执行工作,然后在进行下一个步骤之前等待,直到所有任务都完成。它使得所有的并行任务都将在栅栏处列队,因此可以一致地向前移动。这非常像CountDownLatch,只是CountDownLatch是只触发一次的事件,而CyclicBarrier可以多次重用。

栗子1

在上家公司时,由于机构 DIY 课程定制需要从固定课程复制,而复制需要调用三个小组的微服务,三个微服务分别为 课程创建、讲次创建、卷子创建,讲次挂在课程上,卷子挂在讲次上,关系如下:

常用线程同步类CountDownLatch、CyclicBarrier用法引子CountDownLatchCyclicBarrier

但是后面发现这样耗时太长,因此改为使用id生成器提前生成好班型id,再通过传入班型id,同时进行班型、讲次、卷子的复制,这时候需要保证3个服务中任何一个出现异常的时候,所有已经创建完成的任务数据需要回滚(此处的回滚为调用对应微服务提供的数据清除接口进行)。

一开始使用CompleteService + CountDownLatch + 线程池实现,但后面发现实现复杂,并且线程池只做通信,并没有发挥真正的作用,因此之后使用CyclicBarrier实现了一个优化版本,以下为优化版本的示例。 (CountDownLatch版可参考blog:https://blog.csdn.net/y124675160/article/details/104399114 其中还有关于ExecutorCompletionService的源码讲解)

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.util.*;
import java.util.concurrent.*;

/**
 * @author tingyu
 * @date 2020/12/12
 */

@Slf4j
public class ClassTypeCreateDemo {

    private static ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("ClassTypeCreateExecutor").build();
    // 初始化线程池,这里的线程数等配置根据实际场景进行配置,由于此处多数为调用远程微服务作业,为IO密集型,可以设置大一点。
    // 如果任务多数为排队执行,对队列要求高的,建议使用MQ的队列,将排队的压力交给云端,线程池只做执行使用
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 20,
            60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), threadFactory);

    public static void main(String[] args) throws InterruptedException {

        // 设置任务执行结果的初始值
        Map<String, Boolean> execResultMap = new HashMap<>(16);
        execResultMap.put("ClassType", true);
        execResultMap.put("Lesson", true);
        execResultMap.put("LessonPaper", true);

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        // 讲次创建
        threadPoolExecutor.submit(new LessonRunnable(cyclicBarrier, "Lesson", execResultMap));
        // 卷子创建
        threadPoolExecutor.submit(new LessonPaperRunnable(cyclicBarrier, "LessonPaper", execResultMap));
        // 课程创建
        Future<Integer> claaTypeFuture =
                threadPoolExecutor.submit(new ClassTypeCallable(cyclicBarrier, "ClassType", execResultMap));

        try {
            Integer classTypeId = claaTypeFuture.get();
            log.info("课程创建成功,课程ID为:{}", classTypeId);
        } catch (ExecutionException e) {
            // 课程创建失败
            log.error("课程创建失败!原因为:{}", e.getMessage(), e);
            throw new RuntimeException("课程创建失败!", e);
        }
    }

    /**
     * 课程创建任务
     */
    static class ClassTypeCallable implements Callable<Integer> {

        static Random random = new Random();

        private CyclicBarrier cyclicBarrier;
        private String name;
        private Map<String, Boolean> execResultMap;

        ClassTypeCallable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
            this.execResultMap = execResultMap;
        }

        @Override
        public Integer call() {

            Integer classTypeId = null;
            try {
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                // 模拟课程创建成功后的id赋值
                classTypeId = random.nextInt(10000);
                log.info("课程创建成功!");
            } catch (Exception e) {
                log.error("课程创建异常!", e);
                // 将ClassType的处理状态设置为false
                execResultMap.put(name, false);
            }

            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                log.warn("【{}】调用await异常!", name, e);
            }

            Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
            entries.forEach(entry -> {
                if(!entry.getValue()) {
                    log.warn("【{}】任务创建异常,【{}】进行回滚!", entry.getKey(), name);
                    /********** 数据清理的操作模拟 ****************/
                    throw new RuntimeException("【" + entry.getKey() + "】任务创建异常!");
                }
            });
            return classTypeId;
        }
    }

    /**
     * 讲次创建任务
     */
    static class LessonRunnable implements Runnable {

        static Random random = new Random();

        private CyclicBarrier cyclicBarrier;
        private String name;
        private Map<String, Boolean> execResultMap;

        LessonRunnable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
            this.execResultMap = execResultMap;
        }

        @Override
        public void run() {
            try {
                // 开始资源初始化
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                log.info("讲次创建成功!");
            } catch (Exception e) {
                log.error("讲次创建异常!", e);
                // 将Lesson的处理状态设置为false
                execResultMap.put(name, false);
            }

            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                log.warn("【{}】调用await异常!", name, e);
            }

            Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
            entries.forEach(entry -> {
                if(!entry.getValue()) {
                    log.warn("【{}】任务创建异常,【{}】进行回滚!", entry.getKey(), name);
                    /********** 数据清理的操作模拟 ****************/
                }
            });
        }
    }

    /**
     * 卷子创建任务
     */
    static class LessonPaperRunnable implements Runnable {

        static Random random = new Random();

        private CyclicBarrier cyclicBarrier;
        private String name;
        private Map<String, Boolean> execResultMap;

        LessonPaperRunnable(CyclicBarrier cyclicBarrier, String name, Map<String, Boolean> execResultMap) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
            this.execResultMap = execResultMap;
        }

        @Override
        public void run() {
            try {
                // 开始资源初始化
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                log.info("卷子创建成功!");
            } catch (Exception e) {
                log.error("卷子创建异常!", e);
                // 将LessonPaper的处理状态设置为false
                execResultMap.put(name, false);
            }

            try {
                cyclicBarrier.await();
            } catch (Exception e) {
                log.warn("【{}】调用await异常!", name, e);
            }

            Set<Map.Entry<String, Boolean>> entries = execResultMap.entrySet();
            entries.forEach(entry -> {
                if(!entry.getValue()) {
                    log.warn("【{}】任务创建异常,【{}】进行回滚!", entry.getKey(), name);
                    /********** 数据清理的操作模拟 ****************/
                }
            });
        }
    }
}
           

Copy

栗子2

以下栗子摘自《Thinking in Java》,作为经典的入门书籍,第一次认识到CyclicBarrier也是通过这本书,当时就觉得这个栗子很有意思。

下面的代码主要是模拟了赛马比赛,使用"==="表示栅栏,"***"表示目前马儿跑过的距离,*最右边是每批马儿的编号,可以拷贝代码导自己的工程后执行main方法看下运行效果。

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 这里模拟了赛马,使用"==="表示栅栏,"***"表示目前马儿跑过的距离,*最右边是每批马儿的编号
 * 如以下例子,则表示6号马儿跑在最前面
 * 可以执行main方法看下运行效果
 * ==================================================
 * **********************************************1
 * *****************************2
 * *********************************************3
 * ****************************************4
 * *********************************************5
 * ************************************************6
 * *****************************************7
 * ==================================================
 **/

class Horse implements Runnable{
    private static int counter = 1;
    // 马的编号
    private final int id = counter++;
    // 当前马所走的步数
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;

    public Horse(CyclicBarrier b){
        barrier = b;
    }
    // 返回当前马所跑的步数
    public synchronized int getStrides(){
        return strides;
    }

    public void run(){
        try
        {
            while(!Thread.interrupted()){
                synchronized(this){
                    strides += rand.nextInt(3);
                }
                barrier.await();
            }
        }catch(InterruptedException ex){
            System.out.println(this+ " 通过中断异常退出");
        }catch(BrokenBarrierException e){
            throw new RuntimeException();
        }
    }

    public String toString(){
        return "Horse " + id + " ";
    }

    // 使用"*"表示当前马的轨迹
    public String tracks(){
        StringBuilder s = new StringBuilder();
        for(int i = 0 ; i < getStrides(); i++){
            s.append("*");
        }
        s.append(id);
        return s.toString();
    }
}

public class HorseRace {
      static final int FINISH_LINE = 50;
      private List<Horse> horses = new ArrayList<>();
      private ExecutorService exec = Executors.newCachedThreadPool();
      private CyclicBarrier barrier;

      public HorseRace(int nHorse,final int pause){

          // 初始化栅栏, nHorse即为模拟的马儿数量,代表nHorse只马儿都到达await位置后,执行Runnable线程的方法
          barrier = new CyclicBarrier(nHorse,new Runnable(){
              public void run(){

                  StringBuilder s = new StringBuilder();
                  for(int i = 0; i < FINISH_LINE; i++){
                       s.append("=");
                  }
                  System.out.println(s);
                  for(Horse horse : horses){
                      System.out.println(horse.tracks());
                  }

                  for(Horse horse : horses){
                      if(horse.getStrides() >= FINISH_LINE){
                          System.out.println(horse + " won");
                          exec.shutdownNow();
                          return;
                      }
                  }

                  try{
                      // 睡眠一段时间,模拟马奔跑的时间
                      TimeUnit.MILLISECONDS.sleep(pause);
                  }catch(InterruptedException ex){
                      ex.printStackTrace();
                  }
              }
          });

          for(int i=0; i < nHorse;i++){
              Horse horse = new Horse(barrier);
              horses.add(horse);
              exec.execute(horse);
          }
      }

      public static void main(String[] args){
          // 配置初始7匹马进行竞赛
          int nHorses = 7;
          // 每200毫秒更新一次马儿的移动位置
          int pause = 200;
          new HorseRace(nHorses,pause);
      }
}
           

注意事项

从以上马儿的例子可以看出,CyclicBarrier是可以重复使用的。