天天看点

Zookeeper入门

Zookeeper的理解

Zookeeper是开源的高性能的分布式应用协调系统,

一个高性能的分布式数据一致性解决方案

为什么使用Zookeeper

  • 集群,可靠,解决了单机不可靠的问题
  • 为了保持可靠性,当信息还没有同步完成时,不对外提供服务,提供的数据是最新且准确的
  • 同步的时间比较短,分布式协调好,好像一个“单机”一样

Zookeeper的特点

  • 顺序一致性:发送消息的的顺序一致
  • 原子性:集群内同步消息时的原子性
  • 单一视图:在一个集群中无论连接哪一个,看到的都是一致的
  • 可靠性:直到被改变都保持不变
  • 及时性:一定时间内保证客户端能从服务器拿到最新状态

当client发送一个请求时,由从节点发给leader处理,然后同步给其他节点

CAP

  • 一致性 C
  • 可用性 A
  • 分区容错性:可理解为网络错误,一定会发生 P

Zookeeper和CAP的关系

CP:一致性+分区容错性

能得到一致的数据结果,同时系统对网络具备容错性

但是它不能保证每次服务请求的可用性,网络不好时可能会丢弃一些请求

作用

  • 分布式服务注册与订阅
  • 统一配置文件:当一个配置修改后可以通过Watcher机制进行通知
    Zookeeper入门
  • 生成分布式唯一ID:顺序节点
    Zookeeper入门
  • Master节点选举:创捷有编号的节点,且不能重复
    Zookeeper入门
  • 分布式锁

目的:数据的最终一致性

访问资源获得锁,判断锁是否已经被占用(是否已经有了一个事先约定好的节点),如果已被占用,等待释放await(CountDownLatch)、如果没被占用,创建zk节点,节点使用同一个名字,要是非持久性的锁、当前线程拥有该锁、业务处理完毕,释放锁。

Linux安装

解压:tar zxvf apache-zookeeper-3.7.0-bin.tar.gz

注:不进行配置初次运行会报错

  • 配置:cd apache-z ookeeper-3.7.0-bin cp conf/zoo_sample.cfg conf/zoo.cfg

vi conf/zoo.cfg 把内容修改为:                   

 tickTime=2000 默认

dataDir=/var/lib/zookeeper

clientPort=2181 默认

  • 启动:./bin/zkServer.sh start
  • 停止:./bin/zkServer.sh stop

注:启动和停止都要在Zookeeper的bin目录下

windows 下安装

  • 下载 zookeeper

    网址

    https://zookeeper.apache.org/releases.html 下载 最新版本
  • 解压 zookeeper

    解压运行 zkServer.cmd ,初次运行会报错,没有 zoo.cfg 配置文件

  • 修改 zoo.cfg 配置文件

    将 conf 下的 zoo_sample.cfg 复制一份改名为 zoo.cfg 即可。

注意几个重要位置:

dataDir=./ 临时数据存储的目录(可写相对路径) clientPort=2181 zookeeper 的端口号2181

修改完成后再次启动 zookeeper,运行 zkServer.cmd

节点znode

是一个树结构:类似linux的文件系统

Zookeeper入门
  • 每一个节点都是znode,里面可以包含数据,也可以有子节点
  • 节点可以分为永久节点和临时节点(session失效,也就是客户端断开后,临时节点消失)
  • 每个znode都有版本号,每当数据变化,版本号会累加(乐观锁)

    使用中删除或者修改节点,版本号不匹配报错

  • 节点存储的数据不宜过大  ,会降低性能。比如可以存redis的key或者文件的路径
  • 节点可以设置权限,来限制用户的访问
  • 读和写都是原子操作,每次都是对数据的完整读取和完整写入

节点类型:创建时就已经确定,不可更改类型

  • 持久节点 PERSISTENT
  • 临时节点:PERSISTENT_SEQUENTIAL 维护服务发现
  • 顺序节点:EPEMERAL 持久和临时节点都可以时顺序或者非顺序的(生成唯一ID)

节点版本属性

  • dataVersion  : 当数据更改时+1
  • cversion:当子节点改变+1
  • aclVersion:当权限改变+1

常用命令linux下

  • 启动: ./bin/zkServer/.sh start
  • 连接Server: ./bin/zkCli.sh -server 127.0.0.1 2181
  • 查看子节点:ls path
  • 查看节点状态: stat path
  • 查看节点的数据  get path
  • 创建修改删除节点:

    create [-s] [-e] path [data]  不支持创建多级目录   -s 顺序 -e 临时

    set [-s] [-v version]  path data   -v version 乐观锁(多线程操作时)

    delete [-v version] path  有子节点不允许删除

Watcher机制

  • 触发器。监督者:当客户端注册以后,如果触发了Watcher机制,zookeeper将自动进行对客户端的通知,而不是需要客户端对服务端进行轮询确认。
    Zookeeper入门
  • 使用场景:统一资源配置
  • Watcher的监听类型如下:
    Zookeeper入门

ACL权限控制

  • access control list权限控制,类似shiro
  • 它使用权限位来允许/禁止对节list点及其所作用域的各种操作
  • ACL仅与特定的znode有关,与子节点无关

ACL权限控制格式:

[scheme采用的权限机制:id用户:permissions权限组合字符串]

权限机制:world、auth(明文)、digest(加密)、ip(常用)、super(最高权限)

权限字符串crdwa

Create、Read、Delete、Write、Admin(最高)

使用场景:

  • 区分开发,测试,运维环境,防止误操作
  • 可以针对不同的IP而产生具体的配置,更安全

Demo

原生Java的Api的缺点

  • 不支持连接超时后的自动重连
  • Watcher注册一次后会失效,触发后
  • 不支持递归创建节点

使用Apache Curator工具类

  • 解决Watcher注册一次后失效的问题
  • API更加简单易用

1.引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
  </dependency>      

2.测试demo

/**
 * 描述:     用Curator来操作ZK
 */
public class CuratorTests {
    public static void main(String[] args) throws Exception {
        // 配置zookeeper地址
        String connectString = "127.0.0.1:2181";
        // 节点路径
        String path = "test/curator";
        // 连接重试策略
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retry);
        // 启动 服务
        client.start();
        // 设置watcher 监听机制
        client.getCuratorListenable().addListener((CuratorFramework c, CuratorEvent event) -> {
            switch (event.getType()) {
                case WATCHED:
                    WatchedEvent watchedEvent = event.getWatchedEvent();
                    if (watchedEvent.getType() == EventType.NodeDataChanged) {
                        System.out.println(new String(c.getData().forPath(path)));
                    }
            }
        });
        String data = "test";
        String data2 = "test2";
        // 创建一个节点
        client.create().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes());
        // 获取节点数据
        byte[] bytes = client.getData().watched().forPath(path);
        System.out.println(new String(bytes));
        // 修改一个节点
        client.setData().forPath(path, data2.getBytes());
        // 删除一个节点
        client.delete().forPath(path);
        Thread.sleep(2000);
    }
}