天天看點

zookeeper 實作分布式鎖示例(四)

package com.julong.lock;

import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * 分布式鎖實作示例
 * @author julong
 * @date 2021年10月15日 下午9:01:12
 * @desc  Distributed 發音 【dɪ'strɪbjutɪd】
 */
public class DistributedLock implements Lock,Watcher{

  /**
   * 定義zookeeper 對象
   * @author julong
   * @date 2021年10月16日 上午10:12:11
   */
  private ZooKeeper zooKeeper = null;

  /**
   * 定義鎖根節點
   * @author julong
   * @date 2021年10月16日 上午10:12:18
   */
  private String ROOT_LOCK = "/locks"; 

  /**
   * 等待前一個鎖
   * @author julong
   * @date 2021年10月16日 上午10:12:32
   */
  private String WAIT_UP_LOCK;

  /**
   * 辨別目前鎖
   * @author julong
   * @date 2021年10月16日 上午10:12:46
   */
  private String CURRENT_LOCK;

  /**
   * 任務
   * @author julong
   * @date 2021年10月16日 上午10:12:51
   */
  private CountDownLatch countDownLatch;


  /**
   * 構造函數
   * @author julong
   * @date 2021年10月16日 上午10:13:32
   */
  public DistributedLock() {
    super();
    // TODO Auto-generated constructor stub
    //在構造方法中實作 zk的加載
    try {
      this.zooKeeper = new ZooKeeper("192.168.10.100:2181", 4000, this);

      //此處為判斷zookeeper 是否已經連接配接  如果連接配接則退出循環
      while (true) {
        if(this.zooKeeper.getState().isConnected()){
          System.out.println("連接配接成功!");
          break;
        }
        System.out.println("連接配接失敗!");
        Thread.sleep(2000);
      }


      //判斷根節點是否存在
      Stat stat = this.zooKeeper.exists(this.ROOT_LOCK, false);
      if(stat == null){
        this.zooKeeper.create(this.ROOT_LOCK, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
      }

    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (KeeperException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }

  }

  @Override
  public void process(WatchedEvent event) {
    // TODO Auto-generated method stub
    if(null != this.countDownLatch){
      this.countDownLatch.countDown();
    }
  }

  @Override
  public void lock() {
    // TODO Auto-generated method stub
    if(this.tryLock()){//判斷是否獲得鎖成功
      System.out.println(Thread.currentThread().getName()+"->"+this.CURRENT_LOCK+"->節點獲得鎖成功!");
      return;
    }
    try {
      waitForUpLock(this.WAIT_UP_LOCK);//沒有獲得鎖繼續等待鎖
    } catch (KeeperException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }


  private boolean waitForUpLock(String lock) throws KeeperException, InterruptedException{
    Stat stat = this.zooKeeper.exists(lock, true);//監聽目前節點的上一個節點
    if(stat != null){
      System.out.println(Thread.currentThread().getName()+"->等待"+this.ROOT_LOCK+"/"+lock+"->釋放鎖");
      this.countDownLatch = new CountDownLatch(1);
      countDownLatch.await(); //線程等待
      System.out.println(Thread.currentThread().getName()+"->獲得鎖成功!");
    }
    return true;
  }


  @Override
  public void lockInterruptibly() throws InterruptedException {
    // TODO Auto-generated method stub

  }

  //嘗試獲得鎖
  @Override
  public boolean tryLock() {
    // TODO Auto-generated method stub
    try {
      //對目前節點進行 指派 
      this.CURRENT_LOCK = this.zooKeeper.create(this.ROOT_LOCK+"/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

      System.out.println(Thread.currentThread().getName()+"->"+this.CURRENT_LOCK+",嘗試競争鎖資源");
      //擷取根節點下的所有子節點
      List<String> children = this.zooKeeper.getChildren(this.ROOT_LOCK, false);
      //對根節點下的子節點進行排序 此方法 可以實作排序功能 從小到大進行排序
      SortedSet<String> sortedSet = new TreeSet<String>();
      for (String child : children) {
        sortedSet.add(this.ROOT_LOCK+"/"+child);
      }
      // 擷取最小的節點
      String firstNode = sortedSet.first();
      //傳回比目前節小的節點集合
      SortedSet<String> lessThanToElement = sortedSet.headSet(this.CURRENT_LOCK);
//      for (String str : lessThanToElement) {
//        System.out.println("lessThanToElement->"+str);
//      }
      //判斷目前節點 是否與目前建立的節點相等 則 為最小節點
      if(this.CURRENT_LOCK.equals(firstNode)){
        //目前為最小節點
        return true;
      }
      //如果集合不為空 擷取比目前節點 CURRENT_LOCK 更小的最後一個節點 設定給 WAIT_UP_LOCK
      if(!lessThanToElement.isEmpty()){
        this.WAIT_UP_LOCK = lessThanToElement.last();
      }


    } catch (KeeperException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    return false;
  }

  @Override
  public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
    // TODO Auto-generated method stub
    return false;
  }

  @Override
  public void unlock() {
    // TODO Auto-generated method stub
    System.out.println(Thread.currentThread().getName()+"->釋放鎖"+this.CURRENT_LOCK);
    try {
      this.zooKeeper.delete(this.CURRENT_LOCK, -1);
      this.CURRENT_LOCK = null;
      this.zooKeeper.close();
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    } catch (KeeperException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }

  @Override
  public Condition newCondition() {
    // TODO Auto-generated method stub
    return null;
  }

}      
package com.julong.lock;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class DistributedLockTest {

  public static void main(String[] args) {
    // TODO Auto-generated method stub
    //建立10個線程
    final CountDownLatch countDownLatch = new CountDownLatch(10);

    for (int i = 0; i < 10; i++) {
      new Thread(new Runnable() {
        @Override
        public void run() {
          // TODO Auto-generated method stub
            try {
              countDownLatch.await();
              DistributedLock distributedLock = new DistributedLock();
              distributedLock.lock();//獲得鎖
            } catch (InterruptedException e) {
              // TODO Auto-generated catch block
              e.printStackTrace();
            }

        }
      }).start();
      countDownLatch.countDown();
    }

    try {
      System.in.read();
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }

}      
Thread-6->/locks/0000000012,嘗試競争鎖資源
Thread-4->/locks/0000000016,嘗試競争鎖資源
Thread-3->/locks/0000000013,嘗試競争鎖資源
Thread-7->/locks/0000000014,嘗試競争鎖資源
Thread-0->/locks/0000000011,嘗試競争鎖資源
Thread-9->/locks/0000000010,嘗試競争鎖資源
Thread-5->/locks/0000000018,嘗試競争鎖資源
Thread-4->等待/locks//locks/0000000015->釋放鎖
Thread-3->等待/locks//locks/0000000012->釋放鎖
Thread-6->等待/locks//locks/0000000011->釋放鎖
Thread-7->等待/locks//locks/0000000013->釋放鎖
Thread-0->等待/locks//locks/0000000010->釋放鎖
Thread-2->/locks/0000000019,嘗試競争鎖資源
Thread-8->/locks/0000000017,嘗試競争鎖資源
Thread-1->/locks/0000000015,嘗試競争鎖資源
Thread-9->/locks/0000000010->節點獲得鎖成功!
Thread-1->等待/locks//locks/0000000014->釋放鎖
Thread-2->等待/locks//locks/0000000018->釋放鎖
Thread-8->等待/locks//locks/0000000016->釋放鎖
Thread-5->等待/locks//locks/0000000017->釋放鎖