天天看点

chapter02_进程与线程_5_生产者-消费者问题

  • 方法一:使用Semaphore解决

    思路:设置两个信号量availableItems和availableSpaces,每次put和take之前先尝试获取对应的信号量(down操作),获取不到就阻塞;获取到了调用底层的两个受this锁保护的doInsert和doExtract方法,在这两个方法中会将对应的信号量做up操作

    @ThreadSafe
      public class SemaphoreBoundedBuffer<E> {
    
          private final Semaphore availableItems, availableSpaces;
    
          @GuardedBy("this")
          private final E[] items;
    
          @GuardedBy("this")
          private int putPosition = 0, takePosition = 0;
    
          public SemaphoreBoundedBuffer(int capacity) {
    
              if (capacity <= 0) {
                  throw new IllegalArgumentException();
              }
    
              availableItems = new Semaphore(0);
              availableSpaces = new Semaphore(capacity);
              items = (E[]) new Object[capacity];
          }
    
          ...
    
          public void put(E x) throws InterruptedException {
    
              availableSpaces.acquire();
    
              doInsert(x);
    
              availableItems.release();
          }
    
          public E take() throws InterruptedException {
    
              availableItems.acquire();
    
              E item = doExtract();
    
              availableSpaces.release();
    
              return item;
          }
    
          private synchronized void doInsert(E x) {
    
              int i = putPosition;
              items[i] = x;
              putPosition = (++i == items.length) ? 0 : i;
          }
    
          private synchronized E doExtract() {
    
              int i = takePosition;
              E x = items[i];
              items[i] = null;
              takePosition = (++i == items.length) ? 0 : i;
              return x;
          }
      }
               
  • 方法二:用Condition解决(这也是ArrayBlockingQueue的实现思路)

    思路:用一个Lock对象保护put和take方法,初始化时生成同一个Lock的两个Condition对象notFull和notEmpty,如果put或take时不满足条件,那就在对应的Condition对象上条件等待;如果满足条件,就做各种操作,并且signal另一个Condition对象

    @ThreadSafe
      public class ConditionBoundedBuffer<T> {
    
          private static final int BUFFER_SIZE = 100;
    
          protected final Lock lock = new ReentrantLock();
    
          // CONDITION PREDICATE: notFull (count < items.length)
          private final Condition notFull = lock.newCondition();
    
          // CONDITION PREDICATE: notEmpty (count > 0)
          private final Condition notEmpty = lock.newCondition();
    
          @GuardedBy("lock")
          private final T[] items = (T[]) new Object[BUFFER_SIZE];
    
          @GuardedBy("lock")
          private int tail, head, count;
    
          // BLOCKS-UNTIL: notFull
          public void put(T x) throws InterruptedException {
    
              lock.lock();
    
              try {
                  while (count == items.length) {
                      notFull.await();
                  }
    
                  items[tail] = x;
    
                  if (++tail == items.length) {
                      tail = 0;
                  }
    
                  ++count;
                  notEmpty.signal();
    
              } finally {
                  lock.unlock();
              }
          }
    
          // BLOCKS-UNTIL: notEmpty
          public T take() throws InterruptedException {
    
              lock.lock();
    
              try {
                  while (count == 0) {
                      notEmpty.await();
                  }
    
                  T x = items[head];
                  items[head] = null;
    
                  if (++head == items.length) {
                      head = 0;
                  }
    
                  --count;
    
                  notFull.signal();
    
                  return x;
    
              } finally {
                  lock.unlock();
              }
          }
      }
               
  • 这两种实现方式的区别在于:

    使用信号量时要先做down操作,然后再获得锁;而使用条件变量时就没有这样的限制。原因在于条件变量wait时会让出当前的锁给其他线程,而信号量阻塞不会释放锁,所以顺序就不能反