天天看點

03-zookeeper基礎1. zookeeper用戶端指令操作2. zookeeper内部原理3.API操作

文章目錄

  • 1. zookeeper用戶端指令操作
  • 2. zookeeper内部原理
    • 2.1 持久化節點和臨時節點
    • 2.2 Stat結構體
    • 2.3 監聽原理
    • 2.4paxos算法
    • 2.5選舉機制
    • 2.6寫資料流程
  • 3.API操作
    • 3.1zk用戶端操作
    • 3.2動态上下線
    • 3.3同步線程鎖

1. zookeeper用戶端指令操作

1.啟動zookeeper用戶端
zkCli.sh
2.建立普通節點
create /iweb "jianhau"
3.擷取節點的值
get /iweb
4.建立短暫節點
create -e /iweb
6.監聽節點的變化
ls /iweb watch
5.退出
quit

           

[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-v9afaE3X-1570004773361)(11886AE15C5E4EE284904D287986A1BD)]

2. zookeeper内部原理

2.1 持久化節點和臨時節點

  • 持久化節點是當用戶端和zookeeper斷開連接配接後,該節點依舊存在
  • 臨時節點是當用戶端和zookeeper斷開連接配接後,節點自動删除

2.2 Stat結構體

[zk: localhost:2181(CONNECTED) 5] get /sanguo
jinlian
cZxid = 0x100000003       ==建立節點的事務id==               
ctime = Wed Aug 29 00:03:23 CST 2018 ==被建立的好毫秒數==
mZxid = 0x100000003 ==最後更新的事務==
mtime = Wed Aug 29 00:03:23 CST 2018 ==最後更改的毫秒數==
pZxid = 0x100000004 ==最後更新的子節點id==
cversion = 1 ==子節點修改次數==
dataVersion = 0 ==資料變化号==
aclVersion = 0  ==通路被控制清單的變化号==
ephemeralOwner = 0x0    ==如果是臨時節點,這個是擁有者的session id 如果不是臨時節點就是0==
dataLength = 7  ==資料長度==
numChildren = 1 ==子節點數量==

           

2.3 監聽原理

03-zookeeper基礎1. zookeeper用戶端指令操作2. zookeeper内部原理3.API操作
  • 首先建立一個main線程
  • 在main線程中建立一個用戶端,這是就會建立兩個線程connect,Listener,一負責通信,一個負責監聽
  • 通過connect注冊監聽
  • 在zookeeper的注冊監聽器清單中添加注冊的監聽事件
  • 監聽到有資料或者路徑變化時通過,就會将消息發送給listener線程
  • listener線程内部調用了process方法
常見的監聽
1.監聽資料
get path [watch]
2.監聽子節點的增減變化
ls path [watch]
           

2.4paxos算法

  1. 當節點發生變化後,會彙報給zkserver,此時zkserver收到一個zxid
  2. 如果zxid大于自己目前的zxid,先記錄下來,然後同步給其他的zkserver如果超過半數的zkserver同意後即生效,更新後同步給其他的zkserver,修改自己的zxid

2.5選舉機制

  1. 半數機制:叢集中半數以上的叢集存活,叢集可用,是以安裝奇數台伺服器
  2. 雖然在配置檔案中沒有指定的主從,但是會選舉産生一個leader和其他的follower
  3. 如果有五台機器
  • 第一台上線後先投自己一票
  • 第二台上線後,先頭自己一票,由于他的id比第一台的大,是以第一台改投第二台,此時第一台0票,第二台2票,但是少于3,仍然不能成為leader
  • 第三台上線後,此時伺服器都會改選票給伺服器3,此時他又三票,成為leader,其他狀态為follower
  • 第四台啟動,123不會交換選票資訊,第四台隻有一票少數服從多數

2.6寫資料流程

  1. client向server1發送寫資料請求,如果server1不是leader,那麼server1會把請求轉發給leader
  2. leader把寫請求廣播給follow
  3. follow傳回資訊并把請求放入待寫隊列中,并傳回成功資訊
  4. 當leader收到半數以上的成功資訊後,說明該寫操作可以執行
  5. leader向各個server發送送出資訊,各個server收到後落實寫請求,操作成功
  6. 傳回給用戶端操作成功

3.API操作

3.1zk用戶端操作

public class Zkutils {
    private static int Session_Time_Out = 300000;
    private static ZooKeeper zk  = null;
//建立節點
    public static void CreateNodes() throws KeeperException, InterruptedException {
        String path = "/test";
        byte [] bytes = "hello zk ".getBytes();
        String result = zk.create(path, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(result);
	}
	//判斷節點是否存在
    public static void NodeExist()throws Exception{
        String path ="/test";
        Stat stat = zk.exists(path, false);
        System.out.println(stat);

    }
    //擷取資料
    public static void getData() throws KeeperException, InterruptedException {
        String path = "/test";
        Stat stat = new Stat();
        byte[] data = zk.getData(path, false, stat);
        System.out.println(new String(data, Charset.forName("UTF-8")));
        }
        //存儲資料
    public static void setData() throws KeeperException, InterruptedException {
        String path ="/test";
        Stat stat = zk.setData(path, "hellozk".getBytes(), -1);
        System.out.println(stat);

    }
    //删除節點
    public static void DeleteNode() throws KeeperException, InterruptedException {
        String path ="/test";
        zk.delete(path,-1);
    }
	//測試
    public static void main(String[] args) {
        try {
            zk = new ZooKeeper("bigdata1:2181,bigdata2:2181,bigdata3:2181", Session_Time_Out, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent.toString());
            }
        });
//            CreateNodes();
//            setData();
//            getData();
            DeleteNode();
            NodeExist();
        } catch (IOException  e) {

            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
           

3.2動态上下線

public class DynamicUpDown {
    private ZooKeeper zk = null;
    private static int SESSION_TIME_OUT = 300000;
    private static String HOSTS = "bigdata1:2181,bigdata2:2181,bigdata3:2181";
    private static List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
    private static String PRAENT_NODE = "/hosts";
    private List<String> serverlist = new ArrayList<>();


    public  void Init() throws Exception{
        zk = new ZooKeeper(HOSTS, SESSION_TIME_OUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
               String path = watchedEvent.getPath();
                Event.EventType type = watchedEvent.getType();
                //列印出目前監聽事件類型
                System.out.println(type);
                //判斷子節點的變化
                if (Event.EventType.NodeChildrenChanged==type&&path.equals(PRAENT_NODE)){
                    try {
                    //更新清單
                        UpdateServerList();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

            }
        });
        //建立父節點
        Stat stat = zk.exists(PRAENT_NODE, false);
        if (stat == null){
            zk.create(PRAENT_NODE,"父節點".getBytes(),ACL, CreateMode.PERSISTENT);
        }
            UpdateServerList();
    }
    private void UpdateServerList() throws Exception{
        List<String> newServerList = new ArrayList<>();
        List<String> children = zk.getChildren(PRAENT_NODE,true);
        children.forEach(child->{
            try {
            //擷取目前子節點的目錄
                byte[] data = zk.getData(PRAENT_NODE + "/" + child, false, null);
                newServerList.add(new String(data));
                serverlist = newServerList;
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        });
        serverlist.forEach(list->{
            System.out.println(list);
        });
	}
	//關閉資源
    public void close()  {
        try {
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        DynamicUpDown server = new DynamicUpDown();
        try {
            server.Init();
            Thread.sleep(Long.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            server.close();

        }
    }
}

           

3.3同步線程鎖

//抽取任務接口
public interface CustomTask {
    void doSomething();
}
           
//定義自己的任務
public class Mytask implements CustomTask{
    private String name;
    public Mytask(String name){
        this.name =name;
    }
    @Override
    public void doSomething() {
        for (int i = 1;i<=5;i++){
            System.out.println("做事情"+i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

           
/*
指定個數的用戶端通路伺服器的資源
1.上線就向zookeeper用戶端注冊
2.判斷是否隻有一個用戶端工作,若隻有一個,便可以處理業務
3.擷取父節點下注冊的所有的鎖,通過判斷自己是否是号碼最小的那把鎖,如果是則可以處理業務
 */
public class DistributeLock {
    private ZooKeeper zk= null;
    private CustomTask task = null;
    private final List<org.apache.zookeeper.data.ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
    private final int sessionTimeOut = 5000;
    private final String parent_node = "/locks";
    private String connectstring = null;
    private volatile String currentPath = null;
    public DistributeLock(String connectstring){
        this.connectstring = connectstring;
    }
    public DistributeLock(){
        this("bigdata1:2181,bigdata2:2181,bigdata3:2181");
    }
    public void setTask(CustomTask task){
        this.task = task;
    }
	//擷取用戶端
    public void getClient() throws Exception{
        zk =new ZooKeeper(connectstring,sessionTimeOut,event->{
        //監聽子節點的變化
            if (event.getType()==Watcher.Event.EventType.NodeChildrenChanged&&
                    event.getPath().equals(parent_node)){
                try {
                    //拿到所有的子節點
                    List<String> child = zk.getChildren(parent_node,true);
                    //判斷自己是否是最小的節點
                    String currentNode = currentPath.substring(parent_node.length() + 1);
                    //排序
                    Collections.sort(child);
                    if (child.indexOf(currentNode)==0){
                        task.doSomething();
                        //釋放鎖
                        deleteLock();
                        //注冊新鎖
                        registerLock();
                        }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
 });
    }
    //注冊鎖
    public void registerLock()throws Exception{
    //使用CreateMode.EPHEMERAL_SEQUENTIAL臨時順序型
        currentPath = zk.create(parent_node+"/lock",null,ACL,CreateMode.EPHEMERAL_SEQUENTIAL);
	}
	//判斷是否隻有一個節點線上,若隻有自己一個節點,則調用業務處理的方法
    public void watchParent() throws Exception{
        List<String> children = zk.getChildren(parent_node, false);
        if (children!=null&&children.size()==1){
            task.doSomething();
            deleteLock();
        }else {
            Thread.sleep(Long.MAX_VALUE);
        }
    }
    public void deleteLock() throws Exception{
        zk.delete(currentPath,-1);
    }

}

           
//測試
public class Test {
    public static void main(String[] args)  throws Exception{
        //擷取用戶端連接配接
        DistributeLock distributeLock = new DistributeLock();
        CustomTask customTask = new Mytask(UUID.randomUUID().toString());
        //設定任務
		distributeLock.setTask(customTask);
        distributeLock.getClient();
        //注冊鎖 
		distributeLock.registerLock();
		//監聽父節點
		distributeLock.watchParent();

    }
}

           

繼續閱讀