天天看點

[ZooKeeper]糾正官網的Queue示例

Queue接口

public interface Queue<E> {
	
	boolean produce(E e) throws InterruptedException, QueueException;
	
	E consume() throws QueueException, InterruptedException;

}
           

單機版的隊列用BlockingDeque或BlockingQueue就能實作

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

/**
 * 單機版的隊列
 *
 */
public class StandaloneQueue<E> {

	private BlockingDeque<E> deque;
	
	public StandaloneQueue() {
		deque = new LinkedBlockingDeque<>();
	}
	
	public boolean produce(E e) {
		return deque.offerLast(e);
	}
	
	public E consume() throws InterruptedException {
		return deque.takeFirst();
	}
}
           

分布式版的Queue,對于getData和delete時産生的NONODE異常可以放過

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

/**
 * 分布式的隊列
 */
public class DistributedQueue implements Queue<Integer>, Watcher {

	private String root;
	
	private String queueName;
	
	private ZooKeeper zooKeeper;
    
    private Lock lock;  
    
    private Condition nodeChildrenChange;  
	
	private volatile boolean expired;
	
	public DistributedQueue(String address, String root, String queueName) throws QueueException, InterruptedException {
		this.root = root;
		this.queueName = queueName;
		
		lock = new ReentrantLock();  
        nodeChildrenChange = lock.newCondition(); 
		
		try {  
            zooKeeper = new ZooKeeper(address, 3000, this);  
        } catch (IOException e) {  
            throw new QueueException(e);  
        }  
          
        try {  
            Stat stat = zooKeeper.exists(root, false);  
            if (stat == null) {  
                zooKeeper.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
            }  
        } catch (KeeperException e) {  
            if (e.code() != Code.NODEEXISTS) {  
                throw new QueueException(e);  
            }  
        } 
	}
	
	@Override
	public boolean produce(Integer i) throws InterruptedException, QueueException {
		ByteBuffer b = ByteBuffer.allocate(4);
		b.putInt(i);
		byte[] value = b.array();
		
		try {
			zooKeeper.create(root + "/" + queueName, value, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
			return true;
		} catch (KeeperException e) {
			throw new QueueException(e);
		}		
	}
	
	@Override
	public Integer consume() throws QueueException, InterruptedException {
		while (!Thread.interrupted() && !expired) {
			List<String> list;
			try {
				list = zooKeeper.getChildren(root, true);
			} catch (KeeperException e) {
				throw new QueueException(e);
			}
			if (list.isEmpty()) {
				lock.lock();  
                try {  
                    nodeChildrenChange.await();  
                } finally {  
                    lock.unlock();  
                }
			} else {
				Integer min = new Integer(list.get(0).substring(7));
                for(String s : list){
                    Integer tempValue = new Integer(s.substring(7));
                    if(tempValue < min) min = tempValue;
                }
                
                String path = root + "/" + queueName + min;
                try {
					byte[] value = zooKeeper.getData(path, false, null);
					zooKeeper.delete(path, 0);
					
					ByteBuffer buffer = ByteBuffer.wrap(value);
					return buffer.getInt();
				} catch (KeeperException e) {
					if (e.code() != Code.NONODE) {
						throw new QueueException(e);
					}
					// 資料已經被别人取走
				}
			}
		}
		throw new QueueException("interruped or expired");
	}

	@Override
	public void process(WatchedEvent event) {
		if (event.getType() == Watcher.Event.EventType.None) {  
            if (event.getState() == Watcher.Event.KeeperState.Expired) {  
                expired = true;  
                try {  
                    zooKeeper.close();  
                } catch (InterruptedException e) {  
                    // do nothing;  
                }  
            }  
        } else if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {  
            lock.lock();  
            try {  
                nodeChildrenChange.signalAll();  
            } finally {  
                lock.unlock();  
            }  
        }
	}
}