-
方法一:使用Semaphore解决
思路:设置两个信号量availableItems和availableSpaces,每次put和take之前先尝试获取对应的信号量(down操作),获取不到就阻塞;获取到了调用底层的两个受this锁保护的doInsert和doExtract方法,在这两个方法中会将对应的信号量做up操作
@ThreadSafe public class SemaphoreBoundedBuffer<E> { private final Semaphore availableItems, availableSpaces; @GuardedBy("this") private final E[] items; @GuardedBy("this") private int putPosition = 0, takePosition = 0; public SemaphoreBoundedBuffer(int capacity) { if (capacity <= 0) { throw new IllegalArgumentException(); } availableItems = new Semaphore(0); availableSpaces = new Semaphore(capacity); items = (E[]) new Object[capacity]; } ... public void put(E x) throws InterruptedException { availableSpaces.acquire(); doInsert(x); availableItems.release(); } public E take() throws InterruptedException { availableItems.acquire(); E item = doExtract(); availableSpaces.release(); return item; } private synchronized void doInsert(E x) { int i = putPosition; items[i] = x; putPosition = (++i == items.length) ? 0 : i; } private synchronized E doExtract() { int i = takePosition; E x = items[i]; items[i] = null; takePosition = (++i == items.length) ? 0 : i; return x; } }
-
方法二:用Condition解决(这也是ArrayBlockingQueue的实现思路)
思路:用一个Lock对象保护put和take方法,初始化时生成同一个Lock的两个Condition对象notFull和notEmpty,如果put或take时不满足条件,那就在对应的Condition对象上条件等待;如果满足条件,就做各种操作,并且signal另一个Condition对象
@ThreadSafe public class ConditionBoundedBuffer<T> { private static final int BUFFER_SIZE = 100; protected final Lock lock = new ReentrantLock(); // CONDITION PREDICATE: notFull (count < items.length) private final Condition notFull = lock.newCondition(); // CONDITION PREDICATE: notEmpty (count > 0) private final Condition notEmpty = lock.newCondition(); @GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE]; @GuardedBy("lock") private int tail, head, count; // BLOCKS-UNTIL: notFull public void put(T x) throws InterruptedException { lock.lock(); try { while (count == items.length) { notFull.await(); } items[tail] = x; if (++tail == items.length) { tail = 0; } ++count; notEmpty.signal(); } finally { lock.unlock(); } } // BLOCKS-UNTIL: notEmpty public T take() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } T x = items[head]; items[head] = null; if (++head == items.length) { head = 0; } --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
-
这两种实现方式的区别在于:
使用信号量时要先做down操作,然后再获得锁;而使用条件变量时就没有这样的限制。原因在于条件变量wait时会让出当前的锁给其他线程,而信号量阻塞不会释放锁,所以顺序就不能反