Queue接口
public interface Queue<E> {
boolean produce(E e) throws InterruptedException, QueueException;
E consume() throws QueueException, InterruptedException;
}
單機版的隊列用BlockingDeque或BlockingQueue就能實作
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
* 單機版的隊列
*
*/
public class StandaloneQueue<E> {
private BlockingDeque<E> deque;
public StandaloneQueue() {
deque = new LinkedBlockingDeque<>();
}
public boolean produce(E e) {
return deque.offerLast(e);
}
public E consume() throws InterruptedException {
return deque.takeFirst();
}
}
分布式版的Queue,對于getData和delete時産生的NONODE異常可以放過
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
* 分布式的隊列
*/
public class DistributedQueue implements Queue<Integer>, Watcher {
private String root;
private String queueName;
private ZooKeeper zooKeeper;
private Lock lock;
private Condition nodeChildrenChange;
private volatile boolean expired;
public DistributedQueue(String address, String root, String queueName) throws QueueException, InterruptedException {
this.root = root;
this.queueName = queueName;
lock = new ReentrantLock();
nodeChildrenChange = lock.newCondition();
try {
zooKeeper = new ZooKeeper(address, 3000, this);
} catch (IOException e) {
throw new QueueException(e);
}
try {
Stat stat = zooKeeper.exists(root, false);
if (stat == null) {
zooKeeper.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
if (e.code() != Code.NODEEXISTS) {
throw new QueueException(e);
}
}
}
@Override
public boolean produce(Integer i) throws InterruptedException, QueueException {
ByteBuffer b = ByteBuffer.allocate(4);
b.putInt(i);
byte[] value = b.array();
try {
zooKeeper.create(root + "/" + queueName, value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
return true;
} catch (KeeperException e) {
throw new QueueException(e);
}
}
@Override
public Integer consume() throws QueueException, InterruptedException {
while (!Thread.interrupted() && !expired) {
List<String> list;
try {
list = zooKeeper.getChildren(root, true);
} catch (KeeperException e) {
throw new QueueException(e);
}
if (list.isEmpty()) {
lock.lock();
try {
nodeChildrenChange.await();
} finally {
lock.unlock();
}
} else {
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
if(tempValue < min) min = tempValue;
}
String path = root + "/" + queueName + min;
try {
byte[] value = zooKeeper.getData(path, false, null);
zooKeeper.delete(path, 0);
ByteBuffer buffer = ByteBuffer.wrap(value);
return buffer.getInt();
} catch (KeeperException e) {
if (e.code() != Code.NONODE) {
throw new QueueException(e);
}
// 資料已經被别人取走
}
}
}
throw new QueueException("interruped or expired");
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Watcher.Event.EventType.None) {
if (event.getState() == Watcher.Event.KeeperState.Expired) {
expired = true;
try {
zooKeeper.close();
} catch (InterruptedException e) {
// do nothing;
}
}
} else if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
lock.lock();
try {
nodeChildrenChange.signalAll();
} finally {
lock.unlock();
}
}
}
}