一、Demo用法
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
Config config = new Config();
config.useSingleServer().setAddress("redis://172.29.2.10:7000");
RedissonClient redisson = Redisson.create(config);
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("dest_queue1");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
new Thread() {
public void run() {
while(true) {
try {
//阻塞队列有数据就返回,否则wait
System.err.println( blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}.start();
for(int i=1;i<=5;i++) {
// 向阻塞队列放入数据
delayedQueue.offer("fffffffff"+i, 13, TimeUnit.SECONDS);
}
}
上面构造了Redisson 阻塞延时队列,然后向里面塞了5条数据,都是13秒后到期。我们先不启动程序,先打开redis执行:
[root@localhost redis-cluster]# redis-cli -c -p 7000 -h 172.29.2.10 --raw
172.29.2.10:7000> monitor
OK
monitor 命令可以监控redis执行了哪些命令,注意线上不要乱搞,耗性能的。然后我们启动程序,观察redis执行命令情况,这里分为三个阶段:
1.1 第一介段:
客户端程序启动,offer方法执行之前 ,redis服务会收到如下redis命令:
1610452446.652126 [0 172.29.2.194:65025] "SUBSCRIBE" "redisson_delay_queue_channel:{dest_queue1}"
1610452446.672009 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452442403" "limit" "0" "2"
1610452446.672018 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452446.673896 [0 172.29.2.194:65034] "BLPOP" "dest_queue1" "0"
SUBSCRIBE
这里订阅了一个固定的队列 redisson_delay_queue_channel:{dest_queue1}, 就是为了开启进程里面的延时任务,很重要,redisson延时取数据都靠它了。后面会说。
zrangebyscore
zrangebyscore用法扫盲
>> zrangebyscore key min max [WITHSCORES] [LIMIT offset count]
分页获取指定区间内(min - max),带有分数值(可选)的有序集成员的列表。
redisson_delay_queue_timeout:{dest_queue1} 是一个zset,当有延时数据存入Redisson队列时,就会在此队列中插入 数据,排序分数为延时的时间戳。
zrangebyscore就是取出前2条(源码是100条,如下图)过了当前时间的数据。如果取的是0的话就执行下面的zrange, 这里程序刚启动肯定是0(除非是之前的队列数据没有取完)。之所以在刚启动时 这样取数据就是为了把上次进程宕机后没发完的数据发完。
zrange
取出第一个数,也就是判断上面的还有不有下一页。
BLPOP
移出并获取 dest_queue1 列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止 , 这里显然没有元素 ,就会一直阻塞。
1.2 第二介段
执行offer向Redisson队列设置值,这个阶段我们发现redis干了下面事情:
1610452446.684465 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455407" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452446.684480 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452446.684492 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.684498 [0 lua] "publish" "redisson_delay_queue_channel:{dest_queue1}" "1610452455407"
1610452446.687922 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455422" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452446.687943 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452446.687958 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.690478 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455424" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452446.690492 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452446.690502 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.692661 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455427" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452446.692674 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452446.692683 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
1610452446.696054 [0 lua] "zadd" "redisson_delay_queue_timeout:{dest_queue1}" "1610452455429" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452446.696081 [0 lua] "rpush" "redisson_delay_queue:{dest_queue1}" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452446.696098 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0"
客户端设置了5条数据。上面也可以看出来。
zadd
往我们zset里面设置 数据截止的时间戳(当前执行的时间戳+延时的时间毫秒值),内容为我们的ffffff1 ,不过特殊编码了,加了点什么,不用管。
rpush
同步一份数据到list队列,这里也不知道干嘛的,先放到这里。
zrange+publish
取出排序好的第一个数据,也就是最临近要触发的数据,然后发送通知 (之前订阅了的客户端,可能是微服务就有多个客户端),内容为将要触发的时间。客户端收到通知后,就在自己进程里面开启延时任务(HashedWheelTimer),到时间后就可以从redis取数据发送。
后面又是我们的5条循环的设置数据 zadd...
1.3第三介段:
到延时时间取redis数据
1610452459.680953 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455416" "limit" "0" "2"
1610452459.680967 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff1"
1610452459.680976 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452459.680984 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" ":\xdf\x0eO\x8c\xa7\xd4C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff1"
1610452459.680991 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES" // 判断是否有值
1610452459.745813 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455480" "limit" "0" "2"
1610452459.745829 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff2"
1610452459.745837 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2"
1610452459.745845 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff3"
1610452459.745848 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452459.745855 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" "e\xfd\xfe?j?\xdbC\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff2" "\x80J\x01j\x11\xee\xda\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff3"
1610452459.745864 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452459.756909 [0 172.29.2.194:65026] "BLPOP" "dest_queue1" "0"
1610452459.758092 [0 lua] "zrangebyscore" "redisson_delay_queue_timeout:{dest_queue1}" "0" "1610452455493" "limit" "0" "2"
1610452459.758108 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff4"
1610452459.758114 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4"
1610452459.758121 [0 lua] "rpush" "dest_queue1" "\x04>\nfffffffff5"
1610452459.758124 [0 lua] "lrem" "redisson_delay_queue:{dest_queue1}" "1" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452459.758133 [0 lua] "zrem" "redisson_delay_queue_timeout:{dest_queue1}" "v\xb5\xd0r\xb48\xd4\xc3\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff4" "\xe7\a\x8b\xee\t-\x94C\r\x00\x00\x00\x00\x00\x00\x00\x04>\nfffffffff5"
1610452459.758143 [0 lua] "zrange" "redisson_delay_queue_timeout:{dest_queue1}" "0" "0" "WITHSCORES"
1610452459.759030 [0 172.29.2.194:65037] "BLPOP" "dest_queue1" "0"
1610452459.760933 [0 172.29.2.194:65036] "BLPOP" "dest_queue1" "0"
1610452459.763913 [0 172.29.2.194:65038] "BLPOP" "dest_queue1" "0"
1610452459.765999 [0 172.29.2.194:65039] "BLPOP" "dest_queue1" "0"
这个阶段是由客户端进程里面的延时任务执行的,延时任务是在第二阶段构造的,已经说了(通过redis的订阅/发布实现)。
zrangebyscore
取出前2条到时间的数据,第一阶段已说。
rpush
将上面取到的数据push到阻塞队列,注意我们第一阶段已经监听了这个阻塞队列
"BLPOP" "dest_queue1" "0"
所以这里就会通知客户端取数据。
lrem + zrem
将取完的数据删掉。
zrange
取zset第一个数据,有的话继续上面逻辑取数据,否则进入下面。
BLPOP
继续监听这个阻塞队列。以便下次用。
1.4 小结
客户端启动,redisson先订阅一个key,同时 BLPOP key 0 无限监听一个阻塞队列(等里面有数据了就返回)。 当有数据put时,redisson先把数据放到一个zset集合(按延时到期时间的时间戳为分数排序),同时发布上面订阅的key,发布内容为数据到期的timeout,此时客户端进程开启一个延时任务,延时时间为发布的timeout。 客户端进程的延时任务到了时间执行,从zset分页取出过了当前时间的数据,然后将数据rpush到第一步的阻塞队列里。然后将当前数据从zset移除,取完之后,又执行 BLPOP key 0 无限监听一个阻塞队列。 上一步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take方法。于是,我们就收到了数据。
redisson不是通过轮询zset的,将延时任务执行放到进程里面实现,只有到时间才会取redis zset。redisson里面还有很多异常,重试机制 。
二、redisson延时队列优化
由于我在线上使用了redisson延时队列,在数据量小的时候表现很佳也很稳定,但我们瞬时流量特别大,发生了到了延时时间了还给我延时十几分钟的情况,这个是我万万没想到的。
当我设置了14511条数据到redisson延时队列时,取出来的时间在本身的延时时间上还延时了198636多毫秒,而且时间随着数据增加而增加。 我们线上是微信自动发消息业务,这样会导致你跟你女朋友推晚安消息,结果她第二天早上收到,然后她就认为你勾搭上了美国的妞,和你分手,并暗暗自喜到老娘早就有外遇了,就盼着这天了。
为了你的性福,于是我想到了一个优化的法子,。需要自己定制开发。
优化后的测试结果:
代码结构:
具体代码请看这篇文章
redisson使用的是netty里面的延时任务io.netty.util.HashedWheelTimer
三、HashedWheelTimer 实现原理
HashedWheelTimer本质是一种类似延迟任务队列的实现,适用于对时效性不高的,可快速执行的,大量这样的“小”任务,能够做到高性能,低消耗
redisson用的 org.redisson.connection.MasterSlaveConnectionManager
// 初始化 timer
protected void initTimer(MasterSlaveServersConfig config) {
int[] timeouts = new int[]{config.getRetryInterval(), config.getTimeout()};
Arrays.sort(timeouts);
int minTimeout = timeouts[0];
if (minTimeout % 100 != 0) {
minTimeout = (minTimeout % 100) / 2;
} else if (minTimeout == 100) {
minTimeout = 50;
} else {
minTimeout = 100;
}
// minTimeout 为100
timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, false);
connectionWatcher = new IdleConnectionWatcher(this, config);
subscribeService = new PublishSubscribeService(this, config);
}
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
try {
System.err.println(time + " HadLuo ======================newTimeout==================" + task + " , "+ delay+"ms");
return timer.newTimeout(task, delay, unit);
} catch (IllegalStateException e) {
if (isShuttingDown()) {
return DUMMY_TIMEOUT;
}
throw e;
}
}
抽象出来就是这样:
HashedWheelTimer timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), 100,
TimeUnit.MILLISECONDS, 1024, false);
// 构建一个延时任务
timer.newTimeout((time) -> {
System.err.println("到了12s后了,该娶媳妇了~");
}, 12, TimeUnit.SECONDS);