天天看点

redis客户端、分布式锁及数据一致性

  Redis Java客户端有很多的开源产品比如Redission、Jedis、lettuce等。

  Jedis是Redis的Java实现的客户端,其API提供了比较全面的Redis命令的支持;Redisson实现了分布式和可扩展的Java数据结构,和Jedis相比,功能较为简单,不支持字符串操作,不支持排序、事务、管道、分区等Redis特性。Redisson主要是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。由于常用的是jedis,所以这边使用jedis作为演示。

jedis-sentinel原理:

  客户端通过连接到哨兵集群,通过发送Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME 命令,从哨兵机器中询问master节点的信息,拿到master节点的ip和端口号以后,再到客户端发起连接。连接以后,需要在客户端建立监听机制,当master重新选举之后,客户端需要重新连接到新的master节点。

  构造器代码如下:

public JedisSentinelPool(String masterName, Set<String> sentinels, final GenericObjectPoolConfig poolConfig, int timeout, final String password, final int database) {

        this.poolConfig = poolConfig;
        this.timeout = timeout;
        this.password = password;
        this.database = database;

        HostAndPort master = initSentinels(sentinels, masterName);
        initPool(master);
}
      

  其中 masterName 为配置 sentinels的时候再sentinel.conf 所配置的master的名称。 

initSentinels方法:

private HostAndPort initSentinels(Set<String> sentinels, final String masterName) {

    HostAndPort master = null;
    boolean sentinelAvailable = false;

    log.info("Trying to find master from available Sentinels...");
    // 有多个sentinels,遍历这些个sentinels
    for (String sentinel : sentinels) {
     // host:port表示的sentinel地址转化为一个HostAndPort对象。
      final HostAndPort hap = HostAndPort.parseString(sentinel);

      log.debug("Connecting to Sentinel {}", hap);

      Jedis jedis = null;
      try {
        // 连接到sentinel
        jedis = new Jedis(hap);
        // 根据masterName得到master的地址,返回一个list,host= list[0], port =// list[1]
        List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName);

        // connected to sentinel...
        sentinelAvailable = true;

        if (masterAddr == null || masterAddr.size() != 2) {
          log.warn("Can not get master addr, master name: {}. Sentinel: {}", masterName, hap);
          continue;
        }
        // 如果在任何一个sentinel中找到了master,不再遍历sentinels
        master = toHostAndPort(masterAddr);
        log.debug("Found Redis master at {}", master);
        break;
      } catch (JedisException e) {
        // resolves #1036, it should handle JedisException there's another chance
        // of raising JedisDataException
        log.warn(
          "Cannot get master address from sentinel running @ {}. Reason: {}. Trying next one.", hap,
          e.toString());
      } finally {
        if (jedis != null) {
          jedis.close();
        }
      }
    }
// 到这里,如果master为null,则说明有两种情况,一种是所有的sentinels节点都down掉了,一种是master节点没有被存活的sentinels监控到
    if (master == null) {
      if (sentinelAvailable) {
        // can connect to sentinel, but master name seems to not
        // monitored
        throw new JedisException("Can connect to sentinel, but " + masterName
            + " seems to be not monitored...");
      } else {
        throw new JedisConnectionException("All sentinels down, cannot determine where is "
            + masterName + " master is running...");
      }
    }
    //如果走到这里,说明找到了master的地址
    log.info("Redis master running at " + master + ", starting Sentinel listeners...");
    //启动对每个sentinels的监听为每个sentinel都启动了一个监听者MasterListener。MasterListener本身是一个线程,它会去订阅sentinel上关于master节点地址改变的消息。
    for (String sentinel : sentinels) {
      final HostAndPort hap = HostAndPort.parseString(sentinel);
      MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort());
      // whether MasterListener threads are alive or not, process can be stopped
      masterListener.setDaemon(true);
      masterListeners.add(masterListener);
      masterListener.start();
    }

    return master;
  }
      

  可以看到

initSentinels

方法的参数有一个masterName,就是我们所需要查找的master的名字。一开始,遍历多个sentinels,一个一个连接到sentinel,去询问关于masterName的消息,可以看到是通过

jedis.sentinelGetMasterAddrByName()

方法去连接sentinel,并询问当前的master的地址。点进这个方法去看看,源代码是这样写的:从哨兵节点获取master信息的方法:调用的是与Jedis绑定的client去发送一个"get-master-addr-by-name"命令

public List<String> sentinelGetMasterAddrByName(final String masterName) {
    client.sentinel(Protocol.SENTINEL_GET_MASTER_ADDR_BY_NAME, masterName);
    final List<Object> reply = client.getObjectMultiBulkReply();
    return BuilderFactory.STRING_LIST.build(reply);
  }
      

  调用

initPool

方法(构造函数中调用),那么会初始化Jedis实例创建工厂,如果不是第一次调用(

MasterListener

中调用),那么只对已经初始化的工厂进行重新设置。Jedis的JedisSentinelPool的实现仅仅适用于单个master-slave。

Jedis-cluster原理:

   先来看一下他的连接方式:

Set<HostAndPort> hostAndPorts=new HashSet<>();
HostAndPort hostAndPort=new HostAndPort("192.168.11.153",7000);
HostAndPort hostAndPort1=new HostAndPort("192.168.11.153",7001);
HostAndPort hostAndPort2=new HostAndPort("192.168.11.154",7003);
HostAndPort hostAndPort3=new HostAndPort("192.168.11.157",7006);
hostAndPorts.add(hostAndPort);
hostAndPorts.add(hostAndPort1);
hostAndPorts.add(hostAndPort2);
hostAndPorts.add(hostAndPort3);
JedisCluster jedisCluster=new JedisCluster(hostAndPorts,6000);
jedisCluster.set("wuzz","hello");
      

程序启动初始化集群环境:

  1)、读取配置文件中的节点配置,无论是主从,无论多少个,只拿第一个,获取redis连接实例

  2)、用获取的redis连接实例执行clusterNodes()方法,实际执行redis服务端cluster nodes命令,获取主从配置信息

  3)、解析主从配置信息,先把所有节点存放到nodes的map集合中,key为节点的ip:port,value为当前节点的jedisPool

  4)、解析主节点分配的slots区间段,把slot对应的索引值作为key,第三步中拿到的jedisPool作为value,存储在slots的map集合中就实现了slot槽索引值与jedisPool的映射,这个jedisPool包含了master的节点信息,所以槽和几点是对应的,与redis服务端一致

从集群环境存取值:

1)、把key作为参数,执行CRC16算法,获取key对应的slot值

2)、通过该slot值,去slots的map集合中获取jedisPool实例

3)、通过jedisPool实例获取jedis实例,最终完成redis数据存取工作

分布式锁的实现:

  分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。本篇博客将介绍第二种方式,基于Redis实现分布式锁。

  关于锁,其实我们或多或少都有接触过一些,比如synchronized、 Lock这些,这类锁的目的很简单,在多线程环境下,对共享资源的访问造成的线程安全问题,通过锁的机制来实现资源访问互斥。那么什么是分布式锁呢?或者为什么我们需要通过Redis来构建分布式锁,其实最根本原因就是Score(范围),因为在分布式架构中,所有的应用都是进程隔离的,在多进程访问共享资源的时候我们需要满足互斥性,就需要设定一个所有进程都能看得到的范围,而这个范围就是Redis本身。所以我们才需要把锁构建到Redis中。Redis里面提供了一些比较具有能够实现锁特性的命令,比如SETEX(在键不存在的情况下为键设置值),那么我们可以基于这个命令来去实现一些简单的锁的操作.

  首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  1. 互斥性。在任意时刻,只有一个客户端能持有锁。
  2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  3. 具有容错性。只要大部分的Redis节点正常运行,客户端就可以加锁和解锁。
  4. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

1.引入依赖:

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.0.0</version>
</dependency>
      

2.来一个连接 redis 的工具类:

public class JedisConnectionUtils {
	private static JedisPool pool=null;
    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(100);
        pool=new JedisPool(jedisPoolConfig,"192.168.254.136",6399,5000,"wuzhenzhao");
    }
    public static Jedis getJedis(){
        return pool.getResource();
    }
}      

3.加锁:jedis.set(String key, String value, String nxxx, String expx, int time),这个set()方法一共有五个形参:

  • 第一个为key,我们使用key来当锁,因为key是唯一的。
  • 第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。requestId可以使用UUID.randomUUID().toString()方法生成。
  • 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  • 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
  • 第五个为time,与第四个参数相呼应,代表key的过期时间。
private final String LOCK_NAME = "DISTRIBUTEDLOCK";

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * @param acquireTimeout 获得锁的超时时间
     * @param lockTimeout    锁本身的过期时间
     * @return
     */
    public String acquireLock(long acquireTimeout, long lockTimeout) {
        String identifier = UUID.randomUUID().toString();//保证释放锁的时候是同一个持有锁的人
        String lockKey = "lock:" + LOCK_NAME;
        int lockExpire = (int) (lockTimeout / 1000);
        Jedis jedis = null;
        try {//获取连接
            jedis = JedisConnectionUtils.getJedis();
            long end = System.currentTimeMillis() + acquireTimeout;
            //获取锁的限定时间
            while (System.currentTimeMillis() < end) {
                String result = jedis.set(lockKey, identifier, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, lockTimeout);
                if (LOCK_SUCCESS.equals(result)) {
                    return identifier;
                }
                //表示没有超时时间
                if (jedis.ttl(lockKey) == -1) {
                    jedis.expire(lockKey, lockExpire); //设置超时时间
                }
                try {
                    //等待片刻后进行获取锁的重试
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            jedis.close(); //回收
        }
        return null;
    }      

3.释放锁

public boolean releaseLock(String lockName,String identifier){
        System.out.println(lockName+"开始释放锁:"+identifier);
        String lockKey="lock:"+lockName;
        Jedis jedis=null;
        boolean isRelease=false;
        try{
            jedis=JedisConnectionUtils.getJedis();
            while(true){
            	//Watch 命令用于监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断
                jedis.watch(lockKey);
                //判断是否为同一把锁
                if(identifier.equals(jedis.get(lockKey))){
                	//标记事务开始
                    Transaction transaction=jedis.multi();
                    transaction.del(lockKey);
                    if(transaction.exec().isEmpty()){
                        continue;
                    }
                    isRelease=true;
                }else {
                	//TODO 异常
                }
                jedis.unwatch();
                break;
            }
        }finally {
            jedis.close();
        }
        return  isRelease;
    }
      

5.测试:

public class UnitTest  extends Thread{

    @Override
    public void run() {
        while(true){
        	RedisDemo distributedLock=new RedisDemo();
            String rs=distributedLock.acquireLock("updateOrder",
                    2000,5000);
            if(rs!=null){
                System.out.println(Thread.currentThread().getName()+"-> 成功获得锁:"+rs);
                try {
                    Thread.sleep(1000);
                    distributedLock.releaseLock("updateOrder",rs);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                break;
            }
        }
    }

    public static void main(String[] args) {
        UnitTest unitTest=new UnitTest();
        for(int i=0;i<10;i++){
            new Thread(unitTest,"tName:"+i).start();
        }
    }
}
      

  如果你的项目中Redis是多机部署的,那么可以尝试使用

Redisson

实现分布式锁,这是Redis官方提供的Java组件

管道模式:

  Redis服务是一种C/S模型,提供请求-响应式协议的TCP服务,所以当客户端发起请求,服务端处理并返回结果到客户端,一般是以阻塞形式等待服务端的响应,但这在批量处理连接时延迟问题比较严重,所以Redis为了提升或弥补这个问题,引入了管道技术:可以做到服务端未及时响应的时候,客户端也可以继续发送命令请求,做到客户端和服务端互不影响,服务端并最终返回所有服务端的响应,大大提高了C/S模型交互的响应速度上有了质的提高。

  使用方法:

Jedis jedis=new Jedis("192.168.254.136",6399);
Pipeline pipeline=jedis.pipelined();
    for(int i=0;i<1000;i++){
    pipeline.incr("test");
}
pipeline.sync();
      

Redis缓存与数据一致性问题:

  对于读多写少的高并发场景,我们会经常使用缓存来进行优化。比如说支付宝的余额展示功能,实际上99%的时候都是查询,1%的请求是变更(除非是土豪,每秒钟都有收入在不断更改余额),所以,我们在这样的场景下,可以加入缓存,用户->余额

  那么基于上面的这个出发点,问题就来了,当用户的余额发生变化的时候,如何更新缓存中的数据,也就是说。我是先更新缓存中的数据再更新数据库的数据;还是修改数据库中的数据再更新缓存中的数据

  数据库的数据和缓存中的数据如何达到一致性?首先,可以肯定的是,redis中的数据和数据库中的数据不可能保证事务性达到统一的,这个是毫无疑问的,所以在实际应用中,我们都是基于当前的场景进行权衡降低出现不一致问题的出现概率。

更新缓存还是让缓存失效?

  更新缓存表示数据不但会写入到数据库,还会同步更新缓存; 而让缓存失效是表示只更新数据库中的数据,然后删除缓存中对应的key。那么这两种方式怎么去选择?这块有一个衡量的指标。

1. 如果更新缓存的代价很小,那么可以先更新缓存,这个代价很小的意思是我不需要很复杂的计算去获得最新的余额数字。

2. 如果是更新缓存的代价很大,意味着需要通过多个接口调用和数据查询才能获得最新的结果,那么可以先淘汰缓存。淘汰缓存以后后续的请求如果在缓存中找不到,自然去数据库中检索。

先操作数据库还是先操作缓存?

  当客户端发起事务类型请求时,假设我们以让缓存失效作为缓存的的处理方式,那么又会存在两个情况,

1. 先更新数据库再让缓存失效

2. 先让缓存失效,再更新数据库

  前面我们讲过,更新数据库和更新缓存这两个操作,是无法保证原子性的,所以我们需要根据当前业务的场景的容忍性来选择。也就是如果出现不一致的情况下,哪一种更新方式对业务的影响最小,就先执行影响最小的方案。

最终一致性的解决方案:

  对于分布式系统的数据最终一致性问题,我们可以引入消息中间件,对于失败的缓存更新存入对应的 broker,并对其进行订阅,当有消息来了,我们可以对由于网络等非程序错误的异常缓存更新进行重试更新:

 关于缓存雪崩的解决方案:

  当缓存大规模渗透在整个架构中以后,那么缓存本身的可用性讲决定整个架构的稳定性。那么接下来我们来讨论下缓存在应用过程中可能会导致的问题。

缓存雪崩:

  缓存雪崩是指设置缓存时采用了相同的过期时间,导致缓存在某一个时刻同时失效,或者缓存服务器宕机宕机导致缓存全面失效,请求全部转发到了DB层面,DB由于瞬间压力增大而导致崩溃。缓存失效导致的雪崩效应对底层系统的冲击是很大的。

解决方式:

1. 对缓存的访问,如果发现从缓存中取不到值,那么通过加锁或者队列的方式保证缓存的单进程操作,从而避免失效时并并发请求全部落到底层的存储系统上;但是这种方式会带来性能上的损耗

2. 将缓存失效的时间分散,降低每一个缓存过期时间的重复率

3. 如果是因为缓存服务器故障导致的问题,一方面需要保证缓存服务器的高可用、另一方面,应用程序中可以采用多级缓存

缓存穿透:

  缓存穿透是指查询一个根本不存在的数据,缓存和数据源都不会命中。出于容错的考虑,如果从数据层查不到数据则不写入缓存,即数据源返回值为 null 时,不缓存 null。缓存穿透问题可能会使后端数据源负载加大,由于很多后端数据源不具备高并发性,甚至可能造成后端数据源宕掉。

解决方式

1. 如果查询数据库也为空,直接设置一个默认值存放到缓存,这样第二次到缓冲中获取就有值了,而不会继续访问数据库,这种办法最简单粗暴。比如,”key” , “&&”。在返回这个&&值的时候,我们的应用就可以认为这是不存在的key,那我们的应用就可以决定是否继续等待继续访问,还是放弃掉这次操作。如果继续等待访问,过一个时间轮询点后,再次请求这个key,如果取到的值不再是&&,则可以认为这时候key有值了,从而避免了透传到数据库,从而把大量的类似请求挡在了缓存之中。

2. 根据缓存数据Key的设计规则,将不符合规则的key进行过滤采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的BitSet中,不存在的数据将会被拦截掉,从而避免了对底层存储系统的查询压力。

布隆过滤器:

  布隆过滤器是Burton Howard Bloom在1970年提出来的,一种空间效率极高的概率型算法和数据结构,主要用来判断一个元素是否在集合中存在。因为他是一个概率型的算法,所以会存在一定的误差,如果传入一个值去布隆过滤器中检索,可能会出现检测存在的结果但是实际上可能是不存在的,但是肯定不会出现实际上不存在然后反馈存在的结果。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。

  bitmap:

  所所谓的BitMap就是用一个bit位来标记某个元素所对应的value,而key即是该元素,由于BitMap使用了bit位来存储数据,因此可以大大节省存储空间.

  基本思想:

  这此我用一个简单的例子来详细介绍BitMap算法的原理。假设我们要对0-7内的5个元素(4,7,2,5,3)进行排序(这里假设元素没有重复)。我们可以使用BitMap算法达到排序目的。要表示8个数,我们需要8个byte。

  1.首先我们开辟一个字节(8byte)的空间,将这些空间的所有的byte位都设置为0

  2.然后便利这5个元素,第一个元素是4,因为下边从0开始,因此我们把第五个字节的值设置为1

  3.然后再处理剩下的四个元素,最终8个字节的状态如下图

  

  4.现在我们遍历一次bytes区域,把值为1的byte的位置输出(2,3,4,5,7),这样便达到了排序的目的

  从上面的例子我们可以看出,BitMap算法的思想还是比较简单的,关键的问题是如何确定10进制的数到2进制的映射图

  假设需要排序或则查找的数的总数N=100000000,BitMap中1bit代表一个数字,1个int = 4Bytes = 4*8bit = 32 bit,那么N个数需要N/32 int空间。所以我们需要申请内存空间的大小为int a[1 + N/32],其中:a[0]在内存中占32为可以对应十进制数0-31,依次类推:

  a[0]-----------------------------> 0-31

  a[1]------------------------------> 32-63

  a[2]-------------------------------> 64-95

  a[3]--------------------------------> 96-127

  ......................................................

  那么十进制数如何转换为对应的bit位,下面介绍用位移将十进制数转换为对应的bit位:

  1.求十进制数在对应数组a中的下标

  十进制数0-31,对应在数组a[0]中,32-63对应在数组a[1]中,64-95对应在数组a[2]中………,使用数学归纳分析得出结论:对于一个十进制数n,其在数组a中的下标为:a[n/32]

  2.求出十进制数在对应数a[i]中的下标

  例如十进制数1在a[0]的下标为1,十进制数31在a[0]中下标为31,十进制数32在a[1]中下标为0。 在十进制0-31就对应0-31,而32-63则对应也是0-31,即给定一个数n可以通过模32求得在对应数组a[i]中的下标。

  3.位移

  对于一个十进制数n,对应在数组a[n/32][n%32]中,但数组a毕竟不是一个二维数组,我们通过移位操作实现置1

  a[n/32] |= 1 << n % 32 

  移位操作: 

  a[n>>5] |= 1 << (n & 0x1F)

  n & 0x1F 保留n的后五位 相当于 n % 32 求十进制数在数组a[i]中的下标

  布隆过滤器就是基于这么一个原理来实现的。假设集合里面有3个元素{x, y, z},哈希函数的个数为3。首先将位数组进行初始化,将里面每个位都设置位0。对于集合里面的每一个元素,将元素依次通过3个哈希函数进行映射,每次映射都会产生一个哈希值,这个值对应位数组上面的一个点,然后将位数组对应的位置标记为1。查询W元素是否存在集合中的时候,同样的方法将W通过哈希映射到位数组上的3个点。如果3个点的其中有一个点不为1,则可以判断该元素一定不存在集合中。反之,如果3个点都为1,则该元素可能存在集合中

   接下来按照该方法处理所有的输入对象,每个对象都可能把bitMap中一些位置设置为1,也可能会遇到已经是1的位置,遇到已经为1的让他继续为1即可。处理完所有的输入对象之后,在bitMap中可能已经有相当多的位置已经被为1。至此,一个布隆过滤器生成完成,这个布隆过滤器代表之前所有输入对象组成的集合。

  如何去判断一个元素是否存在bit array中呢? 原理是一样,根据k个哈希函数去得到的结果,如果所有的结果都是1,表示这个元素可能(假设某个元素通过映射对应下标为4,5,6这3个点。虽然这3个点都为1,但是很明显这3个点是不同元素经过哈希得到的位置,因此这种情况说明元素虽然不在集合中,也可能对应的都是1)存在。 如果一旦发现其中一个比特位的元素是0,表示这个元素一定不存在.