更多请移步我的博客
背景
项目中存在以下场景需要延迟触发一些事件:
- 订单在未支付状态下30分钟后自动关闭;
- 订单超过15天未主动确认收货需要自动确认收货;
- 商品价格需要在不同的时间段生效不同的价格方案等。
以上场景下需要有一个相对平台化的服务来满足,而不必每个项目自己做定时任务去进行轮询。
解刨延迟/定时任务
构成一个任务有两个要素:执行时间;执行逻辑。对任务规划者而言,并不关心任务执行逻辑,规划者只要在既定的时间触发该任务,但既然作为一个规划者,就必须具备任务的基本维护能力:新增,删除/取消,到期,查找。
那么一个想要实现规划者就必须考虑两件事:1.怎样即时发现时间到期;2.怎样提高任务的维护效率,即怎么存储任务来保证对任务的高效操作。
本文只关注延时队列中对任务的基本规划能力的实现方式,不涉及延时系统的设计讨论,系统层面的话题太大了。
Rocketmq延迟队列实现
Rocketmq的定时队列通过一个叫做“SCHEDULE_TOPIC_XXXX”的Topic来实现,这个Topic用来处理需要被延迟发送的消息。在Rocketmq中延迟消息被分为几个延迟级别,每个延迟级别分别对应“SCHEDULE_TOPIC_XXXX”Topic下一个延迟队列,默认延迟级别为:“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”。在Broker启动时,会启动相对应队列的线程来处理各个延迟队列的延迟消息。
盗用
艾瑞克
一次分享中的图来直观感受下延迟队列的实现。
Rocketmq处理通过消息体的扩展字段
DELAY
来区分Producer是否投递的是延迟消息,如果
DELAY
大于0,即确定是延迟消息,Broker会备份源消息的topic和queueId,并将其替换为对应延迟队列的信息,然后将修改后的消息落盘到commitLog,
DefaultMessageStore#ReputMessageService
Reput线程将消息分发至对应Topic的消息队列(messageQueue),延迟队列被
ScheduleMessageService
消费,延迟消息到期后会被封装为一个新消息(恢复其源Topic及queueId等信息)再次走消息的投递流程到commitLog,然后被Reput到最初要投递的队列,在整个过程中
ScheduleMessageService
同时扮演了Consumer和Producer的角色,区分好这两种角色后再来看
ScheduleMessageService
这段代码会清楚不少。下面列出的代码有所删减改,目的是为了表达核心逻辑。
// ScheduleMessageService
public void start() {
if (started.compareAndSet(false, true)) {
this.timer = new Timer("ScheduleMessageTimerThread", true);
// 给不同级别的队列启动对应的任务线程
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
// 定时持久化消费进度
if (started.get()) ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}
/**
* ScheduleMessageService的内部类
*/
class DeliverDelayedMessageTimerTask extends TimerTask {
public void executeOnTimeup() {
// 找到对应的队列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
// 如果队列不存在,DELAY_FOR_A_WHILE后重新尝试。todo: 什么情况下会出现队列为null呢???
if (cq == null) {
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
return;
}
// 从指定位置拉取队列中的可用消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
// 队列中的消息体大小均为CQ_STORE_UNIT_SIZE,依次进行遍历
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
// 延迟消息的tagCode保存的是延时时间
long tagsCode = bufferCQ.getByteBuffer().getLong();
// omit tagsCode 校正
long now = System.currentTimeMillis();
// 交付时间计算
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
// 下次开始拉取消息的位置
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
// 如果到了交付时间,则从commitLog中拉取出消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
// 封装称为一条新的消息,主要是将在topic、queueId、tagCode等替换为源消息的值
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
// reput失败的处理,主要是重设定时任务及持久化消费进度
return;
}
} catch (Exception e) {
//omit
}
}
} else {
// 重设定时任务及持久化消费进度
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 重设定时任务及持久化消费进度
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
// 校正消费偏移量
long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
// omit error log
}
}
}
}
此种方式数据结构类似链表,但链表中的数据天然有序,为什么呢?因为消息的延迟时间是Broker落盘时间加上延迟级别对应的时间,落盘后的消息才会被
ReputMessageService
Reput线程进行分发到指定的
MessageQueue
,而Reput线程是个单线程,整个过程FIFO,所以延迟队列天然有序。
我们再从任务的行为:新增,删除/取消,到期,查找,来看下Rocketmq延迟队列作为任务的优缺点。
- 因为Rocketmq的设计方式,注定其延迟队列只能支持特定延迟时间的特点;
- 因为Rocketmq并不是为延迟任务而生,所以它无法删除/取消一个定时任务;
- 生产一个延迟消息和生产一个普通消息的过程是一致的,因此新增一个延迟消息无非就是像broker进行消息投递,如果网络稳定,其时间消耗稳定;
-
的天然有序,保证队列头的消息一定是最先到期的,所以到期任务的检索时间稳定;MessageQueue
- 消息的查找只可根据msgId查找或者消息key查找,msgId中包含消息的物理偏移量(类似MySql的主键)可直接定位消息,而key的查找是根据key的hashCode查找索引文件获得,可能或出现查出多条的情况,需要客户端自己根据key再次过滤,除了在控制台很少在Consumer端使用key发起查询。
个人认为虽然Rocketmq具备延迟的功能,但其主要目的是为了实现“至少投递一次”语义,因为一个延迟消息处理完成会被Rocketmq落盘两次,就只是因为topic和queueId不同,如果Rocketmq只用来处理延迟消息,那么其存储的数据量延迟消息的两倍。所以如果延迟消息量不是很大并且不需要灵活的延迟时间的话,Rocketmq不失为一种好的选择。
DelayQueue
在JDK中带有定时/延迟特性的两个类:
DelayQueue
和
ScheduledThreadPoolExecutor
。这两个类名起的很好,见名知意。我们从这两个类的数据结构入手看下JDK这两个自带类如何实现任务的规划。
DelayQueue
内部使用
PriorityQueue
来存储定时/延时任务,相当于是
PriorityQueue
的一种使用场景,主要特性是当队列为空或任务时间未到期时可让拉取线程进行等待。
PriorityQueue
使用
二叉堆
来存储数据,这里不去深究
二叉堆
的定义,其特性是根节点一定是整个堆中最大/最小的点,这点是比较符合优先队列的特性,
PriorityQueue
的根节点
queue[0]
是最小的节点,称为
最小堆
。PS:
二叉堆
是
完全二叉树
或者是近似完全二叉树。
完全二叉树 | 满二叉树 | |
---|---|---|
总节点k | 2 h − 1 < = k < = 2 h − 1 2^{h-1}<= k <= 2^{h}-1 2h−1<=k<=2h−1 | 2 h − 1 2^{h}-1 2h−1 |
树高h | h = l o g 2 k + 1 h=log_{2}k+1 h=log2k+1 | h = l o g 2 ( k + 1 ) h=log_{2}(k+1) h=log2(k+1) |
PriorityQueue
用数组来实现
二叉堆
,
queue[n]
节点的两个子节点存储在
queue[2*n+1]
和
queue[2*(n+1)]
,使用指定的
Comparator
或节点本身实现比较接口
Comparable
来排序。站在
DelayQueue
的角色来看,
PriorityQueue
的排序关键字是到期时间,比较器
Comparator/Comparable
比较的就是延迟时间的大小。
PriorityQueue
关键操作的平均时间复杂度:增加
O(log(n))
,查找
O(n)
,删除=查找+移除
O(n) + O(log(n))
,到期任务检索
O(1)
。PS:就
二叉堆
本身而言还有堆合并等操作,而且不同构造方式的时间复杂度也不相同,但不是这里的关注重点。
// PriorityQueue
/**
* 队列
*/
transient Object[] queue;
/**
* 队列中元素的数量。同时用来记录下一个元素的index。
*/
private int size = 0;
/**
* 排序使用的比较器,可以不设置,默认使用元素的自然排序
*/
private final Comparator<? super E> comparator;
public boolean offer(E e) {
// omit check
// 元素在数组中的index
int i = size;
// omit grow Capacity
size = i + 1;
if (i == 0)
queue[0] = e;
else
// 构造堆结构
siftUp(i, e);
return true;
}
private void siftUp(int k, E x) {
// 是自定义比较器
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
private void siftUpUsingComparator(int k, E x) {
while (k > 0) {
// 按照存储规则找到自己的父节点
int parent = (k - 1) >>> 1;
Object e = queue[parent];
// 如果待插入节点不小于父节点,说明位置正确,不再继续向上比较
// 最差循环次数为堆的高度
if (comparator.compare(x, (E) e) >= 0)
break;
// 否则将父节点放在当前位置,继续向上比较
queue[k] = e;
k = parent;
}
// 进入队列
queue[k] = x;
}
/**
* 查找指定的节点
*/
private int indexOf(Object o) {
if (o != null) {
// 因为实际存储结构为数组,所以此时需要进行遍历
// 因此时间复杂度为O(n)
for (int i = 0; i < size; i++)
if (o.equals(queue[i]))
return i;
}
return -1;
}
在JDK中使用
ScheduledThreadPoolExecutor
同样使用了
最小堆
来规划任务,但与
PriorityQueue
不同,
ScheduledThreadPoolExecutor
对任务元素进行了优化,让任务本身持有自己在数组中的index,这样将取消操作时间复杂度降到了O(1),但如果
removeOnCancel
参数配置为true时,取消操作会引起删除操作,此时就增加了堆的维护。该配置默认为false,这延迟了垃圾回收,会导致无谓的内存占用,前提是任务存在极大可能在开始前被取消。
// ScheduledThreadPoolExecutor内部类
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
/** 延迟时间,单位:纳秒 */
private long time;
/**
* 任务的重复时间,单位:纳秒。
* 正数表明固定速率执行。
* 负数说明固定延时时间执行。
* 0说明该任务不需重复。
*/
private final long period;
/** 任务本身,可通过reExecutePeriodic方法重新入队 */
RunnableScheduledFuture<V> outerTask = this;
/**
* 任务在延迟队列中的index,用来支持快速取消任务。
*/
int heapIndex;
}
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 数组初识长度
private static final int INITIAL_CAPACITY = 16;
// 任务数组
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
// 队列中元素的数量。同时用来记录下一个元素的index。
private int size = 0;
/**
* leader是指等待在queue上的第一个线程,该线程等待时间为第一个任务到期的时间,
* 其余的等待线程无限期等待,直到leader把其唤醒。此处为Leader-Follower模式,目的是减少不必要的等待时间。
* 更多释义参见远吗注释。
*/
private Thread leader = null;
/**
* 当队列头部任务变得可用或者一个新线程称为leader是会在该条件上发出信号。
*/
private final Condition available = lock.newCondition();
// 对数组的增加删除操作同PriorityQueue相似
}
TimeWheel
以上两种实现方式有一个共同点,他们并没有按照预想将任务和延迟时间分离,接下来我们要讨论的时间轮就是按照这个思路来实现任务的规划,看一下时间轮如何来存储和存储/规划任务。
网上盗图来直观感受下时间轮。
上图展示的一个多层时间轮,红线部分表示任务随时间流逝迁移的过程,直到最后被执行。
时间轮是对时钟的一个模拟,在时间轮中有以下几点需要注意:
- Tick,一次时间推进,每次推进会检查/执行超时任务。
- TickDuration,时间轮推进的最小单元,每隔TickDuration会有一次Tick,它决定了时间轮的精确程度。
- Bucket(TicksPerWheel),上图中的每一隔就是一个Bucket,表示一个时间轮可以有多少个Tick,它是存储任务的最小单元。
- 上层时间轮的TickDuration是下层时间轮的表示时间的最大范围,即:父TickDuration = 子TickDuration * 子Bucket梳理
Netty和Kafka使用时间轮来管理超时任务,因为具体场景不同实现也不同。
Netty
Netty的时间轮用来管理网络连接的超时,实际应用过程中网络超时时间一般不会很长,Netty采用单时间轮来规划超时任务。
public class HashedWheelTimer implements Timer {
// 默认 100ms
private final long tickDuration;
// Bucket,使用数组实现,默认 512
private final HashedWheelBucket[] wheel;
// 待加入时间轮的任务
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
// 被取消的任务
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
// 新增一个任务
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 计算超时时间
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// 构建任务
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
// 添加到 timeouts 链表
timeouts.add(timeout);
return timeout;
}
// Bucket用双向链表组织数据
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
}
// 超时任务结构
private static final class HashedWheelTimeout implements Timeout {
// 任务本身
private final TimerTask task;
// 超时时间
private final long deadline;
// 剩余轮询次数
long remainingRounds;
// 被添加到的Bucket
HashedWheelBucket bucket;
}
}
上面是Netty时间轮主要的数据结构,从源码中我们可以看出Netty为提升效率废了不少心思,比如:
PlatformDependent.newMpscQueue()
来特定解决多生产单消费的场景,有兴趣的可以看下,这不是本文重点。
前面我们说了,Netty是单时间轮,当规划的任务时间超过一圈怎么办呢?我们看到超时任务
HashedWheelTimeout
中有一个
remainingRounds
字段,这个字段记录了该任务在被遍历多少次时可以被过期,任务每被遍历一次该字段值减1,当其值不大于0时,表示可以被过期。
Netty使用
数组 + 双向链表
的方式来组织时间轮,对于添加/取消操作仅做了记录,真正的操作实际发生在下一个Tick。我们来看下Netty版的时间轮对任务规划的时间复杂度。
-
添加任务
通过
方法新增任务,Netty仅仅把任务放到newTimeout
链表的队尾,时间相对稳定,可看作O(1)。timeouts
-
取消任务
取消是
任务本身提供的,在调用HashedWheelTimeout
方法后,Netty仅仅修改了任务状态并把任务放到了HashedWheelTimeout#cancel()
链表的队尾,时间相对稳定,可看作O(1)。cancelledTimeouts
- 删除任务
外部不暴露该操作,发生在过期任务时,由void remove() { // 当前任务被规划到的Bucket,如果不存在说明尚未被规划,直接忽略 HashedWheelBucket bucket = this.bucket; if (bucket != null) { // 从bucket的链表中移除 bucket.remove(this); } }
任务本身提供,因为任务本身有记录当前的Bucket,所以删除操作等于是链表的一个删除操作,时间相对稳定,可看作O(1)。HashedWheelTimeout
-
查找任务
不存在该场景。
- 过期任务
时间的推进是独立的一个线程在做,该线程同时也负责过期任务的执行等操作,可简单认为此步骤操作为O(n),因为推进线程需要完全遍历// 时间推进线程 private final class Worker implements Runnable { private long tick; public void run() { do { // 获取本次过期时间 final long deadline = waitForNextTick(); if (deadline > 0) { // 本次Bucket位置 int idx = (int) (tick & mask); // 将 cancelledTimeouts 中任务从Bucket中删除 processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; // 将 timeouts 中的任务添加到对应的Bucket中 transferTimeoutsToBuckets(); // 遍历当前Bucket,执行当前Bucket中的过期任务 bucket.expireTimeouts(deadline); // 记录嘀嗒次数 tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); } }
、timeouts
与cancelledTimeouts
链表,在遍历bucket
时,Netty为了避免任务过多,所以限制每次最多遍历10万个,也就是说,一个tick只能规划10万个任务,当任务量过大时,会存在超时任务执行时间延迟的现象。timeouts
我们上面有说到取消任务时,只是将任务放在了超时链表中,在下次发生时间推进时才会真正将被取消的任务从队列移除,这就导致空间的浪费,GC回收会至少延迟一个推进间隔(TickDuration)。
如今Netty在网络通信中的霸主地位不必多言,虽然Netty中时间轮的实现不是最好的但一定是满足了Netty这个特定场景的性能需要,即:没有最好的技术,只有适合的技术。这一点对平时开发设计也有借鉴意义。
Kafka
Kafka也使用
数组 + 双向链表
的方式来组织时间轮(
Timer.scala
),只是有多个数组来表示多层时间轮而已,所以其对任务的增加/删除/取消操作实际也就是对双向链表的增删操作,时间复杂度与其一致,其到期任务的过程下面我们与Netty做比较来看。
上面关于Netty实现的一些比较蛇皮的地方,在Kafka中均得到了解决,并且Kafka中使用JDK中的
DelayQueue
来帮助做时间推进,使得一个线程可以轻松管理错层时间轮的时间推进。
我们先来看他规避了Netty中哪些不太合适的地方:
- Netty单层时间轮规划超过一轮的任务时使用
来做控制,这就导致在一次推进中,当前Bucket下的任务并不是全部都是过期的,而Kafka使用多层时间轮,一个Bucket被某次推进选中,它下挂的任务行为完全一致,处理起来相对简单,Kafka目前也是由推进线程来遍历到期任务,但它也可以方便的使用fork/join方式来进一步提高处理效率。remainingRounds
- Kafka虽然也由推进线程遍历到期Bucket下的任务,但任务的执行却交给专门的线程池来执行,自己仅将该Bucket下的所有任务重新走一遍添加逻辑,以便决定任务是交给线程池执行还是下降到下层时间轮。
Kafka既然是多层时间轮,那他是如何来推进每个时间轮的时间呢?Kafka在时间推进层面跳过了时间轮这一层,直接规划到时间轮下的Bucket,Kafka将所有Bucket放在
DelayQueue
中来简化时间推进的操作,这样多个时间轮的推进任务只需要一个线程便可完成!
我们之前讲
DelayQueue
不太适合任务规划,但Kafka时间轮中的Bucket数量并不会很多,在数量不多的情况下
DelayQueue
性能还是不错的(应该是满足了Kafka的性能需求),所以此处选择了
DelayQueue
来实现时间的推进。还是应了这句话:没有最好的技术,只有适合的技术。
Kafka的时间轮实现更契合我对超时中心的认知,超时中心只关心时间是否到期并进行回调通知,并不关心和执行任务的情况,Kafka明确了角色分工,所以在海量数据下会更高效。
这里不再贴出源码分析,如果前面几种实现一一看过的话,看Kafka的实现就很容易。
扩展
文章拖的时间太久了,还有两个与定时任务相关的开源项目没来得及看,大家自行看下里面怎么实现定时功能的吧。
- Quartz Scheduler
- 当当任务调度
总结
这篇文章前后花了一个月的休息时间来做调研和整理,虽然文中有贴出部分源代码,但代码作者的巧妙构思完全不能被表达,还是建议去看源码。
正所谓:纸上得来终觉浅,绝知此事要躬行。
下面给出代码版本:
- Rocketmq release-4.5.0
- Netty 4.1
- Kafka 2.2
- JDK 1.8.0_74
小生不才,以上如有描述有误的地方还望各位不吝赐教 !_!
参考
如何让快递更"快"?菜鸟自研定时任务调度引擎首次公开
TimingWheel本质与DelayedOperationPurgatory核心结构
Kafka技术内幕样章 层级时间轮
Kafka解惑之时间轮(TimingWheel)
你真的了解延迟队列吗
定时器(Timer)的实现
二叉堆动画展示
二叉树