天天看点

32-分布式锁-zookeeper实现分布式锁

zk分布式锁的原理:

查看文章:zookeeper分布式锁实现

我们通过去创建zk的一个临时node,来模拟给一个商品id加锁

zk会保证只会创建一个临时node,其他请求过来如果再要创建临时node,就会报错,NodeExistsException

那么说我们的所谓上锁,其实就是去创建某个product id对应的一个临时node。

如果临时node创建成功了,那么说明我们成功加锁了,此时就可以去执行对redis刷新数据的操作

如果临时node创建失败了,说明有人已经在拿到锁了,在操作reids中的数据,那么就不断的等待,直到自己可以获取到锁为止

基于zk client api,去封装上面的这个代码逻辑

释放一个分布式锁,去删除掉那个临时node就可以了,就代表释放了一个锁,那么此时其他的机器就可以成功创建临时node,获取到锁

即使是用zk去实现一个分布式锁,也有很多种做法,有复杂的,也有简单的

应该说,我演示的这种分布式锁的做法,是非常简单的一种,但是很实用,大部分情况下,用这种简单的分布式锁都能搞定

复杂得可以查看文章:https://blog.csdn.net/weixin_40663800/article/details/86595492。

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.5</version>
</dependency>
           
/**
 * ZooKeeperSession
 * @author Administrator
 *
 */
public class ZooKeeperSession {

    // CountDownLatch是java多线程并发同步的一个工具类,会传递进去一些数字,比如说1,2 ,3 都可以
    // 然后await(),如果数字不是0,那么就卡住了等待
    // 其他的线程可以调用coutnDown(),就会减1,
    // 如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态
    // 继续向下运行
    private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
    private ZooKeeper zookeeper;

    /**
     * 封装单例的静态内部类
     * @author Administrator
     */
    private static class Singleton {
        private static ZooKeeperSession instance;
        static {
            instance = new ZooKeeperSession();
        }
        public static ZooKeeperSession getInstance() {
            return instance;
        }
    }
    /**
     * 获取单例
     * @return
     */
    public static ZooKeeperSession getInstance() {
        return Singleton.getInstance();
    }
    /**
     * 初始化单例的便捷方法
     */
    public static void init() {
        getInstance();
    }
    /**
     * 构造方法初始化zk连接
     */
    public ZooKeeperSession() {
        // 去连接zookeeper server,创建会话的时候,是异步去进行的
        // 所以要给一个监听器watcher,说告诉我们什么时候才是真正完成了跟zk server的连接
        try {
            this.zookeeper = new ZooKeeper(
                    "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181", 
                    50000, 
                    new ZooKeeperWatcher());
            // 给一个状态CONNECTING,连接中
            System.out.println(zookeeper.getState());
            try {
                    //await(),如果数字不是0,那么就卡住了等待
                connectedSemaphore.await();
            } catch(InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("ZooKeeper session established......");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /**
     * 建立zk session的watcher,需要自己建
     * @author Administrator
     *
     */
    private class ZooKeeperWatcher implements Watcher {
        public void process(WatchedEvent event) {
            System.out.println("Receive watched event: " + event.getState());
            if(KeeperState.SyncConnected == event.getState()) {
                // 调用coutnDown(),就会减1,如果数字减到0,那么之前所有在await的线程,都会逃出阻塞的状态
                System.out.println("Realease CountDownLatch Lock Thread event: " + event.getState());
                connectedSemaphore.countDown();
            } 
        }
        
    }

    
    /**
     * 获取分布式锁
     * @param productId
     */
    public void acquireDistributedLock(Long productId) {
        String path = "/product-lock-" + productId;
    
        try {
            // CreateMode.EPHEMERAL说明是临时的
            zookeeper.create(path, "".getBytes(), 
                    Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("success to acquire lock for product[id=" + productId + "]");  
        } catch (Exception e) {
            // 如果那个商品对应的锁的node,已经存在了,就是已经被别人加锁了,那么就这里就会报错
            // NodeExistsException
            int count = 0;
            while(true) {
                try {
                    Thread.sleep(20); 
                    zookeeper.create(path, "".getBytes(), 
                            Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (Exception e2) {
                    e2.printStackTrace();
                    count++;
                    continue;
                }
                System.out.println("success to acquire lock for product[id=" + productId + "] after " + count + " times try......");
                break;
            }
        }
    }
    
    /**
     * 释放掉一个分布式锁
     * @param productId
     */
    public void releaseDistributedLock(Long productId) {
        String path = "/product-lock-" + productId;
        try {
            zookeeper.delete(path, -1); 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
           

创建节点的方法: 

  • String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
  • void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)
path 需要创建的数据节点的节点路径,例如,/zk-book/foo
data[] 一个字节数组,是节点创建后的初始内容
acl 节点的ACL策略
createMode 节点类型,是一个枚举类型,通常有4种可选的节点类型
  • 持久(PERSISTENT)
  • 持久顺序(PERSISTENT_SEQUENTIAL)
  • 临时(EPHEMERAL)
  • 临时顺序(EPHEMERAL_SEQUENTIAL)
cb

注册一个异步回调函数。开发人员需要实现StringCallback接口,主要是对下面这个方法的重写:

void processResult(int rc, String path, Object ctx, String name);

当服务端节点创建完毕后,ZooKeeper客户端就会自动调用这个方法,这样就可以处理相关的业务逻辑了

ctx 用于传递一个对象,可以在回调方法执行的时候使用,通常是放一个上下文(Context)信息

需要注意几点,无论是同步还是异步接口,ZooKeeper都不支持递归创建,即无法在父节点不存在的情况下创建一个子节点。另外,如果一个节点已经存在了,那么创建同名节点的时候,会抛出NodeExistsException异常。

目前,ZooKeeper的节点内容只支持字节数组(byte[])类型,也就是说,ZooKeeper不负责为节点内容进行序列化,开发人员需要自己使用序列化工具将节点内容进行序列化和反序列化。对于字符串,可以简单地使用“string”.getBytes()生成一个字节数组;对于其他复杂对象,可以使用Hessian或是Kryo等专门的序列化工具来进行序列化。

关于ACL权限控制,如果你的应用场景没有太高的权限要求,那么可以不关注这个参数,只需要在acl参数中传入参数Ids.OPEN_ACL_UNSAFE,这就表明之后对这个节点的任何操作都不受权限控制。