天天看點

Java學習筆記分享之Zookeeper篇

1.1 Zookeeper簡介

Zookeeper是Apache開源的緻力于開發和維護實作高可用、高可靠的分布式協調伺服器。Zookeeper提供了一種集中式服務,用于維護配置資訊、服務命名、提供分布式同步以及提供組服務。這一類的服務如果沒有統一的管理,那麼就在分布式應用運作過程中,會導緻不可避免的錯誤和競争條件。

1.2 Zookeeper安裝

Zookeeper的安裝有單機、叢集和僞叢集三種類型。在開發中隻需要搭建單機就可以,但是在生産環境一般為了避免單點故障都會部署叢集或者僞叢集模式。

由于Zookeeper采用過半投票機制政策,是以叢集至少需要保證有3個以上的伺服器。那麼叢集和僞叢集有什麼差別呢。

叢集:在多個獨立的實體伺服器上各自部署Zookeeper,這樣的好處是單台實體伺服器故障不會導緻整體受影響。但是也增加了網絡通信的問題。

僞叢集:僞叢集和叢集其實差不多,隻是在同一台實體伺服器上部署多個Zookeeper,這樣的确定就是如果實體伺服器故障,将導緻整體不可用。

以下示範的安裝是以 Linux環境、CentOS 7系統為基礎,那麼在使用之前我們需要做一下配置,如果有環境并且已經處理了可以忽略

JDK安裝問題,Zookeeper是Java語言編寫的,是以安裝之前需要確定Linux已經安裝了JDK并且可以正常使用

  • 使用java -version檢查是否有預設安裝的JDK,如果有需要解除安裝
  • 将linux的JDK安裝包tar.gz上傳到伺服器
  • tar -zxvf jdk-11.0.7_linux-x64_bin.tar.gz解壓
  • 配置JDK環境變量
    • vi /etc/profile打開環境配置檔案
#java environment
export JAVA_HOME=/usr/local/java/jdk1.8.0_201
export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$PATH:$JAVA_HOME/bin      
    • source /etc/profile讓配置檔案生效
    • java -version如果顯示jdk版本資訊就是安裝成功

防火牆問題,CentOS 7防火牆預設是開啟的,那麼如果不關閉端口就不可以通路,在代碼中調用的時候就無法使用預設的2181和其他端口

# CentOS 6處理方法
//臨時關閉
service iptables stop
//禁止開機啟動
chkconfig iptables off      
# CentOS 7處理方法,由于CentOS 7以後的防火牆預設使用firewalld,是以處理方法和CentOS 6有差異
//臨時關閉
systemctl stop firewalld
//禁止開機啟動
systemctl disable firewalld
Removed symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.
Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.sersvice.      
# 額外提供的指令
# 防火牆相關指令
1、檢視防火牆狀态 : systemctl status firewalld.service
注:active是綠的running表示防火牆開啟
2、關閉防火牆 :systemctl stop firewalld.service
3、開機禁用防火牆自啟指令 :systemctl disable firewalld.service
4、啟動防火牆 :systemctl start firewalld.service
5、防火牆随系統開啟啟動 : systemctl enable firewalld.service
6、重新開機防火牆 : firewall-cmd --reload
# 端口開放指令
1、查詢已經開放的端口 :firewall-cmd --list-port
2、查詢某個端口是否開放 :firewall-cmd --query-port=80/tcp
3、開啟端口 :firewall-cmd --zone=public --add-port=80/tcp --permanent
注:可以是一個端口範圍,如1000-2000/tcp
4、移除端口 :firewall-cmd --zone=public --remove-port=80/tcp --permanent
5、指令含義:
    --zone #作用域
    --add-port=80/tcp #添加端口,格式為:端口/通訊協定
    --remove-port=80/tcp #移除端口,格式為:端口/通訊協定
    --permanent #永久生效,沒有此參數重新開機後失效      

1.2.1 單機版

  • 下載下傳Zookeeper http://zookeeper.apache.org/releases.html 打開這個網址就可以看到不同的版本直接下載下傳即可
  • 在/usr/local/目錄下建立zookeeper檔案夾,用于存放zookeeper。

    mkdir zookeeper

  • 将下載下傳的zookeeper.tar.gz壓縮包上傳到Linux的/usr/local/zookeeper目錄下
# 解壓
tar -zxvf zookeeper-3.4.14.tar.gz      
# 進入conf目錄,zookeeper啟動是讀取zoo.cfg配置檔案,是以重命名或者拷貝一份都可以
cp zoo_sample.cfg zoo.cfg      
# 在zookeeper目錄下建立data檔案夾,用于存儲檔案,不使用預設的路徑檔案夾
mkdir data
# 修改conf/zoo.cfg配置檔案的dataDir路徑,這個是資料存放的檔案路徑
dataDir=/usr/local/zookeeper/data      
# 指令
./bin/zkServer.sh start  # 啟動
./bin/zkServer.sh stop   # 暫停
./bin/zkServer.sh status # 檢視狀态      
# 啟動指令執行的效果
[root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED      
# 檢視狀态指令執行的效果
[root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg
Mode: standalone  # standalone表示單機版本      
# 暫停指令執行的效果
[root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh stop
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED      

1.2.2 叢集版

  • 下載下傳 打開這個網址就可以看到不同的版本直接下載下傳
  • 上傳zookeeper.tar.gz壓縮包到/usr/local/zookeeper目錄下
# 在/usr/local/目錄下建立檔案夾
mkdir zkcluster      
# 解壓zookeeper到zkcluster檔案夾,-C的作用是指定解壓到那個檔案夾下
tar -zxvf zookeeper-3.4.14.tar.gz -C /zkcluster      
# 修改檔案名稱,并複制
mv zookeeper-3.4.14 zookeeper01
# 複制2份,叢集數量為3
cp -r zookeeper01/ zookeeper02
cp -r zookeeper01/ zookeeper03      
# 在每個zookeeper目錄下建立data檔案夾,并在data目錄下建立log檔案夾
mkdir data
cd data
mkdir log      
# 修改/conf/zoo_sample.cfg配置檔案名 zookeeper01  zookeeper02  zookeeper03都要執行
cp zoo_sample.cfg zoo.cfg      
# 修改zoo.cfg配置檔案的端口的資料存儲就以及日志路徑,端口分别是:2181 2182 2183
# 端口
clientPort=2181
# 存儲路徑
dataDir=/usr/local/zookeeper/zkcluster/zookeeper01/data
# 日志路徑
dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper01/data/log
clientPort=2182
dataDir=/usr/local/zookeeper/zkcluster/zookeeper02/data
dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper02/data/log
clientPort=2183
dataDir=/usr/local/zookeeper/zkcluster/zookeeper03/data
dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper03/data/log      
# 配置叢集,分别在data目錄下建立myid,内容分别是1 2 3用于記錄每個伺服器的ID,也是叢集中zookeeper的唯一标記,不可以重複
touch ./zookeeper01/data/myid
touch ./zookeeper02/data/myid
touch ./zookeeper03/data/myid      
#在每個zookeeper的zoo.cfg檔案中配置用戶端通路端口(clientPort)和叢集伺服器IP清單
#server.伺服器ID=伺服器IP位址:伺服器之間通信端口:伺服器之間投票選舉端口
server.1=192.168.247.100:2881:3881
server.2=192.168.247.100:2882:3882
server.3=192.168.247.100:2883:3883      
# 依次啟動3個zookeeper,相關指令如下
./bin/zkServer.sh start
./bin/zkServer.sh stop
./bin/zkServer.sh status      

1.3 基本使用指令

啟動後打開用戶端

./bin/zkCli.sh

如果要連接配接的是遠端的zookeeper,那麼使用

./bin/zkCli.sh -server ip:port

連接配接指定的伺服器

1.3.1 建立節點

create指令用于建立一個zookeeper節點

create [-s][-e] path data
其中,-s或-e分别指定節點特性,順序或臨時節點,若不指定,則建立持久節點;      

在執行建立節點之前,可以使用

ls /

指令,那麼就會顯示

[zookeeper]

這個是自動生成的持久節點。

節點需要一級一級的建立,不可以一下子建立多級

建立持久節點

[zk: localhost:2181(CONNECTED) 5] create /zk-seq 123
Created /zk-seq
[zk: localhost:2181(CONNECTED) 6] ls / 
# zk-seq已經建立好
[zk-seq, zookeeper]      

建立持久順序節點

注意:

  • 順序節點建立後zookeeper預設會在節點名稱後面拼接序列,這個數字序列用于标記節點的順序。
  • 順序節點的特點就是節點後面會拼接一串數字表示和非順序節點的差別。
[zk: localhost:2181(CONNECTED) 3] create -s /zk-test 123
Created /zk-test0000000000
[zk: localhost:2181(CONNECTED) 4] ls /
# zk-test已經建立好
[zk-test0000000000, zookeeper]      

建立臨時節點

  • 臨時節點的特點是如果會話斷開連接配接,那麼節點就會自動被zookeeper删除。
[zk: localhost:2181(CONNECTED) 7] create -e /zk-temp 123
Created /zk-temp
[zk: localhost:2181(CONNECTED) 8] ls /
# zk-temp已經建立好
[zookeeper, zk-temp]
# 退出關閉會話,然後重新登入看臨時節點是否還存在
[zk: localhost:2181(CONNECTED) 9] quit
Quitting...
2020-08-08 15:29:51,404 [myid:] - INFO  [main:ZooKeeper@693] - Session: 0x1000009d3a80000 closed
2020-08-08 15:29:51,407 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@522] - EventThread shut down for session: 0x1000009d3a80000
# 重新登入後執行ls指令,可以看到臨時節點/zk-temp已經被删除
[zk: localhost:2181(CONNECTED) 0] ls /      

建立臨時順序節點

[zk: localhost:2181(CONNECTED) 1] create -s -e /zk-temp 123456
Created /zk-temp0000000003
[zk: localhost:2181(CONNECTED) 2] ls /
[zk-temp0000000003, zookeeper]      

1.3.2 讀取節點

讀取節點指令主要有

ls path

get path

。ls 指令可以列出zookeeper指定節點的所有子節點,但是隻可以檢視子一級目錄。get指令可以擷取zookeeper指定節點的資料和屬性資訊。

[zk: localhost:2181(CONNECTED) 4] get /zk-temp0000000003 # 擷取臨時順序節點的資訊
123456 # 節點資料内容
cZxid = 0x7 # 建立時候的事務ID
ctime = Sat Aug 08 15:32:00 CST 2020 # 建立時間
mZxid = 0x7 # 修改時候的事務ID
mtime = Sat Aug 08 15:32:00 CST 2020 # 修改時間
pZxid = 0x7 # 最新修改的zxid
cversion = 0 # 子節點被修改版本
dataVersion = 0 # 資料版本
aclVersion = 0 # acl權限控制版本
ephemeralOwner = 0x1000009d3a80001
dataLength = 6 # 資料長度
numChildren = 0 # 子節點數      

1.3.3 更新節點

更新節點使用

set path data [version]

指令。一般不需要指定version版本。

# 以下指令非核心列印的資訊省略了
[zk: localhost:2181(CONNECTED) 6] get /zk-seq # 擷取節點資料
123 # 資料為123
[zk: localhost:2181(CONNECTED) 7] set /zk-seq 456 # 更新節點資料為456
[zk: localhost:2181(CONNECTED) 8] get /zk-seq # 擷取節點資料
456 # 資料已經被更新為456      

1.3.4 删除節點

删除節點使用

delete path [version]

指令。

[zk: localhost:2181(CONNECTED) 0] ls / # 檢視節點
[zk-seq, zk-test0000000000, zookeeper]
[zk: localhost:2181(CONNECTED) 1] delete /zk-seq # 删除/zk-seq節點
[zk: localhost:2181(CONNECTED) 2] ls / # 檢視節點/zk-seq節點已經不存在
[zk-test0000000000, zookeeper]      

1.4 基本API操作Zookeeper

Zookeeper作為一個分布式架構,主要用于解決分布式一緻性問題。是以提供了基于各種版本的原生API,那麼下面我們就學習一下Zookeeper的Java API如何操作Zookeeper。

引入Jar

<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.14</version>
</dependesncy>      

1.4.1 建立會話

在進行Zookeeper的節點操作之前,我們需要和Zookeeper建立一個會話連接配接的過程。但是Zookeeper的原生API進行會話連接配接是異步操作的,如果你和Zookeeper建立連接配接,那麼Zookeeper會立刻傳回給你。但是最終連接配接成功後會異步的通知。

// 類實作Watcher接口,Zookeeper基于這個異步通知
public class CreateSession implements Watcher {
    // 使用同步操作阻塞等待Zookeeper異步的通知
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    public static void main(String[] args) throws IOException, InterruptedException {
        // 參數一:Zookeeper的ip:port
        // 參數二:連接配接逾時時間
        // 參數三:異步通知
        ZooKeeper zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new CreateSession());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();
        System.out.println("zookeeper會話建立成功......");
    }
    // 實作異步通知回調的方法
    @Override
    public void process(WatchedEvent watchedEvent) {
        // 異步通知的事件類型有很多,隻有是建立會話成功的才放行,其他的不處理
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            countDownLatch.countDown();
        }
    }
}      

1.4.2 建立節點

public class CreateNode implements Watcher {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;
    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new CreateNode());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();
        System.out.println("zookeeper會話建立成功......");
        // 建立節點
        createNodeSync();
        Thread.sleep(Integer.MAX_VALUE);
    }
    /**
     * 建立節點
     * 節點的類型:
     *  CreateMode.PERSISTENT 持久節點
     *  CreateMode.PERSISTENT_SEQUENTIAL 持久順序節點
     *  CreateMode.EPHEMERAL 臨時節點
     *  CreateMode.EPHEMERAL_SEQUENTIAL 臨時順序節點
     */
    private static void createNodeSync() throws Exception {
        // 參數一:要建立的節點,一次隻可以建立一個子節點,不可以建立多級
        // 參數二:節點的内容,是一個位元組數組
        // 參數三:節點的權限類型
        // 參數四:節點的類型,這是持久節點
        String s = zooKeeper.create("/lg_persistent", "持久節點内容".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        String s1 = zooKeeper.create("/lg_persistent_sequential", "持久順序節點内容".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        String s2 = zooKeeper.create("/lg_ephemeral", "臨時節點内容".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        String s3 = zooKeeper.create("/lg_ephemeral_sequential", "臨時順序節點内容".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("建立的持久節點是:" + s);
        System.out.println("建立的持久順序節點是:" + s1);
        System.out.println("建立的臨時節點是:" + s2);
        System.out.println("建立的臨時順序節點是:" + s3);
    }
    @Override
    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            countDownLatch.countDown();
        }
    }
}      

1.4.3 擷取節點資料

public class GetNodeData implements Watcher {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;
    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new GetNodeData());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();
        System.out.println("zookeeper會話建立成功......");
        // 擷取節點内容
        getNodeData();
        // 擷取所有的節點
        getChildrens();
        Thread.sleep(Integer.MAX_VALUE);
    }
    private static void getChildrens() throws Exception {
        /*
            參數一:path節點路徑
            參數二:是否要監聽,如果子節點變化會觸發監聽
         */
        List<String> children = zooKeeper.getChildren("/lg_persistent", true);
        System.out.println("/lg_persistent子節點為:" + children);
    }
    private static void getNodeData() throws Exception {
        /**
         * 參數一:要擷取内容的節點path
         * 參數二:是否監聽
         * 參數三:版本,不填預設最新版本
         */
        byte[] data = zooKeeper.getData("/lg_persistent", true, null);
        System.out.println("擷取到的節點内容為:" + new String(data));
    }
    @Override
    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            countDownLatch.countDown();
        }
        // 當位元組點變化的時候,觸發
        if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
            List<String> children = null;
            try {
                // true參數繼續注冊監聽
                children = zooKeeper.getChildren(watchedEvent.getPath(), true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(children);
        }
    }
}      

1.4.4 修改節點資料

public class UpdateNodeData implements Watcher {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    static ZooKeeper zooKeeper;
    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new UpdateNodeData());
        System.out.println(zooKeeper.getState());
        countDownLatch.await();
        System.out.println("zookeeper會話建立成功......");
        // 修改節點内容
        updateNodeData();
        Thread.sleep(Integer.MAX_VALUE);
    }
    private static void updateNodeData() throws Exception {
        // 修改前的節點資料
        byte[] data = zooKeeper.getData("/lg_persistent/c1", true, null);
        System.out.println("修改前的資料為:" + new String(data));
        // 節點資料修改, 版本為-1表示更新最新版本資料
        zooKeeper.setData("/lg_persistent/c1", "節點修改後的資料".getBytes("UTF-8"), -1);
        // 修改後的節點資料
        byte[] data1 = zooKeeper.getData("/lg_persistent/c1", true, null);
        System.out.println("修改後的資料為:" + new String(data1));
    }
    @Override
    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            countDownLatch.countDown();
        }
    }
}      

1.4.5 删除節點

public class DeleteNode implements Watcher {
    static ZooKeeper zooKeeper;
    public static void main(String[] args) throws IOException, InterruptedException {
        zooKeeper = new ZooKeeper("192.168.247.100:2181", 5000, new DeleteNode());
        System.out.println(zooKeeper.getState());
        System.out.println("zookeeper會話建立成功......");
        Thread.sleep(Integer.MAX_VALUE);
    }
    @Override
    public void process(WatchedEvent watchedEvent) {
        // 連接配接成功後删除節點
        try {
            Stat exists = zooKeeper.exists("/lg_persistent/c1", false);
            System.out.println("節點是否存在:" + exists);
            // 删除節點
            zooKeeper.delete("/lg_persistent/c1", -1);
            Stat exists1 = zooKeeper.exists("/lg_persistent/c1", false);
            System.out.println("節點是否存在:" + exists1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}      

1.5 開源用戶端操作Zookeeper

Zookeeper的開源用戶端有很多,這裡隻列舉ZkClient進行示範。開源用戶端對Zookeeper的原生API進行了大量的封裝,使得我們使用起來非常的友善。

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>      

1.5.1 建立會話

public class CreateSession {
    public static void main(String[] args) {
        /**
         * 這個建立會話的過程已經同步化
         * 參數就是zookeeper的ip:port
         */
        ZkClient zkClient = new ZkClient("192.168.247.100:2181");
        System.out.println("用戶端會話建立成功");
    }
}      

1.5.2 建立節點

public class CreateNode {
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("192.168.247.100:2181");
        System.out.println("用戶端會話建立成功");
        /**
         * 方法名稱:
         *  createPersistent 建立持久節點
         *  createEphemeral 建立臨時節點
         *  createPersistentSequential 建立持久順序節點
         *  createEphemeralSequential 建立臨時順序節點
         * 參數一:建立的節點path
         * 參數二:是否建立父節點,如果/lg_zkclient不存在,那麼就建立。
         */
        zkClient.createPersistent("/lg_zkclient/c1", true);
        System.out.println("節點建立成功");
    }
}      

1.5.3 擷取/更新節點資料

public class GetNodeData {
    public static void main(String[] args) throws InterruptedException {
        ZkClient zkClient = new ZkClient("192.168.247.100:2181");
        System.out.println("用戶端會話建立成功");
        // 判斷節點是否存在
        boolean exists = zkClient.exists("/lg_zkclient");
        System.out.println("節點是否存在:" + exists);
        // 擷取節點資料
        Object readData = zkClient.readData("/lg_zkclient");
        System.out.println(readData);
        // 建立監聽事件,監聽節點資料變更
        zkClient.subscribeDataChanges("/lg_zkclient", new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println(s + "節點的資料變更了, " + o);
            }
            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println(s + "節點被删除了");
            }
        });
        
        // 更新節點資料
        zkClient.writeData("/lg_zkclient", "更新後的節點資料");
        Thread.sleep(Integer.MAX_VALUE);
    }
}      

1.5.4 删除節點

public class DeleteNode {
    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("192.168.247.100:2181");
        System.out.println("用戶端會話建立成功");
        // 删除節點
        /*
            方法說明:
                delete 删除單個節點
                deleteRecursive 循環删除節點
         */
        zkClient.deleteRecursive("/lg_zkclient/c1");
        System.out.println("節點循環删除子成功");
    }
}