延时队列相比于普通队列最大的区别就体现在其延时属性上 , 普通队列的元素是先进先出 , 按入队顺序进行处理 , 而延时队列中的元素在入队时会指定一个延迟时间 , 表示其希望能够在经过指定时间后处理 . 从某种意义上来讲 , 延迟队列的结构并不像一个队列 , 而更像一种以时间为权重的有序堆结构 .

应用场景

我们在一些业务场景中 , 经常会遇到一些需要经历一段时间后 , 或者达到某个时间节点才会执行的功能 . 就比如以下这些场景:

  • 外卖或打车在预订时间到达的前十分钟提醒骑手或者司机即将超时
  • 快递收货后在规定时间内用户没有确认收货会自动确认收货
  • 预订的会议在会议开始前十分钟会提醒你尽快加入会议
  • 每日周报在截止半小时前会提醒你尽快提交

为什么要使用延时队列

对于一些数据量小并且对数据的实时性不怎么要求的项目来说 , 最简单有效的办法就是写一个定时任务去扫描数据库以达到业务的实现 . 当然 , 如果在数据达到数百万或者千万级别时 , 如果去定时扫描数据库 , 是非常低效且消耗资源的 , 甚至对于那些时间间隔比较小的情景来说 , 上一次扫描还没结束下一次就要开始了. 这时候使用延时队列比较合适

实现延迟队列的几种途径

  • Quartz 定时任务
  • DelayQueue 延迟队列
  • Redis sorted set
  • Redis 过期键监听回调
  • RabbitMQ 死信队列
  • RabbitMQ 基于插件实现延迟队列
  • Wheel 时间轮训算法

这里先说下结论 , 然后再对每种途径做一些说明

中小项目建议用 Redisson 实现延迟队列 , 有一定规模的项目直接上 MQ

1. Quartz 定时任务

Quartz 是一款非常经典的任务调度框架 , 属于定时去扫表的做法 , 不太适合以上所说延时场景,这里不做展开

2. DelayQueue 延迟队列

JDK 中提供了一组实现延迟队列的 API , 位于 Java.util.concurrent 包下的 DelayQueue

DelayQueue 是一个 BlockingQueue 无界阻塞队列 , 它封装了一个 PriorityQueue (优先队列) , PriorityQueue 内部使用完全二叉堆来实现队列元素排序 , 我们在向 DelayQueue 队列中添加元素时 , 会给元素一个延迟时间作为排序条件 , 队列中最小的元素会优先排到队首. 队列中的元素只有到了延迟时间才允许从队列中取出 . 队列中可以放基本数据类型或自定义实体类 , 在存在基本数据类型时 , 优先队列中元素默认升序排序 , 自定义实体类就需要我们根据类属性值比较计算.

这里简单添加三个order入队 , 分别设置在当前时间 5 秒 /10 秒 /15秒后取消. 要实现 DelayQueue 延迟队列 , 队中元素要实现 Delayed 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Order implements Delayed {
/**
* 延迟时间
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;

public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}

@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order Order = (Order) o;
long diff = this.time - Order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}

DelayQueue的put方法是线程安全的,因为put方法内部使用了ReentrantLock锁进行线程同步。DelayQueue还提供了两种出队的方法poll()和take() , poll()为非阻塞获取,没有到期的元素直接返回null;take()阻塞方式获取,没有到期的元素线程将会等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DelayQueueDemo {

public static void main(String[] args) throws InterruptedException {
Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);

System.out.println("订单延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
/**
* 取队列头部元素是否过期
*/
Order task = delayQueue.poll();
if (task != null) {
System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name,
LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
Thread.sleep(1000);
}
}
}

上边只是简单的实现入队与出队的操作,实际开发中会有专门的线程,负责消息的入队与消费。
执行后看到结果如下,Order1、Order2、Order3 分别在 5秒、10秒、15秒后被执行,至此就用DelayQueue实现了延时队列。
订单延迟队列开始时间:2020-05-06 14:59:09
订单:{Order1}被取消, 取消时间:{2020-05-06 14:59:14}

订单:{Order2}被取消, 取消时间:{2020-05-06 14:59:19}

订单:{Order3}被取消, 取消时间:{2020-05-06 14:59:24}

__由于此种方式只是内存处理 , 没有持久化的设计 , 不太适合生产 , 所以不建议采用 __

3. Redis sorted set

在Redis中,zet作为有序集合,可以利用其有序的特性,将任务添加到zset中,将任务的到期时间作为score,利用zset的默认有序特性,获取score值最小的元素(也就是最近到期的任务),判断系统时间与该任务的到期时间大小,如果达到到期时间,就执行业务,并删除该到期任务,继续判断下一个元素,如果没有到期,就sleep一段时间(比如1秒),如果集合为空,也sleep一段时间。

通过zadd命令向队列delayqueue中添加元素,并设置score值表示元素过期的时间;向delayqueue添加三个order1、order2、order3,分别是10秒、20秒、30秒后过期。

zadd delayqueue 3 order3
消费端轮询队列delayqueue,将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除key。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 消费消息
*/
public void pollOrderQueue() {

while (true) {
Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
String value = ((Tuple) set.toArray()[0]).getElement();
int score = (int) ((Tuple) set.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if (nowSecond >= score) {
jedis.zrem(DELAY_QUEUE, value);
System.out.println(sdf.format(new Date()) + " removed key:" + value);
}
if (jedis.zcard(DELAY_QUEUE) <= 0) {
System.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
Thread.sleep(1000);
}
}

我们看到执行结果符合预期:

1
2
3
4
5
2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty

4. Redis 过期键监听回调

在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。当消费者监听这些channel时,就可以感知到Redis中数据的变化。

这个功能Redis官方称为keyspace notifications,字面意思就是键空间通知。

这些默认的channel被分为两类:

  • __keyspace@<db>__:为前缀,后面跟的是key的名称,表示监听跟这个key有关的事件。

    举个例子,现在有个消费者监听了__keyspace@0__:sanyou这个channel,sanyou就是Redis中的一个普通key,那么当sanyou这个key被删除或者发生了其它事件,那么消费者就会收到sanyou这个key删除或者其它事件的消息

  • __keyevent@<db>__:为前缀,后面跟的是消息事件类型,表示监听某个事件

    同样举个例子,现在有个消费者监听了__keyevent@0__:expired这个channel,代表了监听key的过期事件。那么当某个Redis的key过期了(expired),那么消费者就能收到这个key过期的消息。如果把expired换成del,那么监听的就是删除事件。具体支持哪些事件,可从官网查。

上述db是指具体的数据库,Redis不是默认分为16个库么,序号从0-15,所以db就是0到15的数字,示例中的0就是指0对应的数据库。

延迟队列实现原理

通过对上面的两个概念了解之后,应该就对监听过期key的实现原理一目了然了,其实就是当这个key过期之后,Redis会发布一个key过期的事件到__keyevent@<db>__:expired这个channel,只要我们的服务监听这个channel,那么就能知道过期的Key,从而就算实现了延迟队列功能。

所以这种方式实现延迟队列就只需要两步:

  • 发送延迟任务,key是延迟消息本身,过期时间就是延迟时间
  • 监听__keyevent@<db>__:expired这个channel,处理延迟任务

实现

首先修改 Redis 的配置

1
2
# 修改redis.conf文件开启notify-keyspace-events Ex。
notify-keyspace-events Ex

引入 pom

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Configuration
public class RedisConfiguration {

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(connectionFactory);
return redisMessageListenerContainer;
}

@Bean
public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
}

}

KeyExpirationEventMessageListener实现了对__keyevent@*__:expiredchannel的监听 , 当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件

所以我们只需要监听RedisKeyExpiredEvent事件就可以拿到过期消息的Key,也就是延迟消息。

对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListener

1
2
3
4
5
6
7
8
9
10
@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {

@Override
public void onApplicationEvent(RedisKeyExpiredEvent event) {
byte[] body = event.getSource();
System.out.println("获取到延迟消息:" + new String(body));
}

}

代码写好,启动应用

之后我直接通过Redis命令设置消息,就没通过代码发送消息了,消息的key为sanyou,值为task,值不重要,过期时间为5s

1
2
set sanyou task 
expire sanyou 5

期望的结果是5s后控制台会打印 , 但是测试多次后发现不是稳定得到期望结果

查阅官网后

上面这段话主要讨论的是key过期事件的时效性问题,首先提到了Redis过期key的两种清除策略,就是面试八股文常背的两种:

  • 惰性清除。当这个key过期之后,访问时,这个Key才会被清除
  • 定时清除。后台会定期检查一部分key,如果有key过期了,就会被清除

再后面那段话是核心,意思是说,key的过期事件发布时机并不是当这个key的过期时间到了之后就发布,而是这个key在Redis中被清理之后,也就是真正被删除之后才会发布。

到这我终于明白了,上面的例子中即使我设置了5s的过期时间,但是当5s过去之后,只要两种清除策略都不满足,没人访问sanyou这个key,后台的定时清理的任务也没扫描到sanyou这个key,那么就不会发布key过期的事件,自然而然也就监听不到了。

至于后台的定时清理的任务什么时候能扫到,这个没有固定时间,可能一到过期时间就被扫到,也可能等一定时间才会被扫到,这就可能会造成了客户端从发布到监听到的消息时间差会大于等于过期时间,从而造成一定时间消息的延迟.

除了上面的问题 , 还有一些进一步的问题

1. 丢消息太频繁

Redis的丢消息跟MQ不一样,因为MQ都会有消息的持久化机制,可能只有当机器宕机了,才会丢点消息,但是Redis丢消息就很离谱,比如说你的服务在重启的时候就消息会丢消息。

Redis实现的发布订阅模式,消息是没有持久化机制,当消息发布到某个channel之后,如果没有客户端订阅这个channel,那么这个消息就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费。

所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了。

2. 消息消费只有广播模式

Redis的发布订阅模式消息消费只有广播模式一种。

所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息。

生产者发布了一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou这条消息。

所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量。

3. 接收到所有key的某个事件

这个不属于Redis发布订阅模式的问题,而是Redis本身事件通知的问题。

当消费者监听了以__keyevent@<db>__:开头的消息,那么会导致所有的key发生了事件都会被通知给消费者。

举个例子,某个消费者监听了__keyevent@*__:expired这个channel,那么只要key过期了,不管这个key是张三还会李四,消费者都能收到。

所以如果你只想消费某一类消息的key,那么还得自行加一些标记,比如消息的key加个前缀,消费的时候判断一下带前缀的key就是需要消费的任务。

综上 , 这种方式也不太合适

5. RabiitMQ 延时队列

利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTL和 DXL这两个属性间接实现的。

先来认识一下 TTL和 DXL两个概念:
Time To Live(TTL) :
TTL 顾名思义:指的是消息的存活时间,RabbitMQ可以通过x-message-tt参数来设置指定Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ 可以从两种维度设置消息过期时间,分别是队列和消息本身
设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息TTL都可以不同。
如果同时设置队列和队列中消息的TTL,则TTL值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过TTL过期时间则成为Dead Letter(死信)。

Dead Letter Exchanges(DLX)

DLX即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQ的 Queue(队列)可以配置两个参数x-dead-letter-exchange 和 x-dead-letter-routing-key(可选),一旦队列内出现了Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个Exchange(交换机),让消息重新被消费。
x-dead-letter-exchange:队列中出现Dead Letter后将Dead Letter重新路由转发到指定 exchange(交换机)。
x-dead-letter-routing-key:指定routing-key发送,一般为要指定转发的队列。

队列出现Dead Letter的情况有:

消息或者队列的TTL过期
队列达到最大长度
消息被消费端拒绝(basic.reject or basic.nack)

下边结合一张图看看如何实现超30分钟未支付关单功能,我们将订单消息A0001发送到延迟队列order.delay.queue,并设置x-message-tt消息存活时间为30分钟,当到达30分钟后订单消息A0001成为了Dead Letter(死信),延迟队列检测到有死信,通过配置x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。

发送消息时指定消息延迟的时间

1
2
3
4
5
6
7
8
public void send(String delayTimes) {
amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延迟数据", message -> {
// 设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
}
}

设置延迟队列出现死信后的转发规则

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 延时队列
*/
@Bean(name = "order.delay.queue")
public Queue getMessageQueue() {
return QueueBuilder
.durable(RabbitConstant.DEAD_LETTER_QUEUE)
// 配置到期后转发的交换
.withArgument("x-dead-letter-exchange", "order.close.exchange")
// 配置到期后转发的路由键
.withArgument("x-dead-letter-routing-key", "order.close.queue")
.build();
}

6. 时间轮

前边几种延时队列的实现方法相对简单,比较容易理解,时间轮算法就稍微有点抽象了。kafka、netty都有基于时间轮算法实现延时队列,下边主要实践Netty的延时队列讲一下时间轮是什么原理。

wheel :时间轮,图中的圆盘可以看作是钟表的刻度。比如一圈round 长度为24秒,刻度数为 8,那么每一个刻度表示 3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越大,精度越大。

当添加一个定时、延时任务A,假如会延迟25秒后才会执行,可时间轮一圈round 的长度才24秒,那么此时会根据时间轮长度和刻度得到一个圈数 round和对应的指针位置 index,也是就任务A会绕一圈指向0格子上,此时时间轮会记录该任务的round和 index信息。当round=0,index=0 ,指针指向0格子 任务A并不会执行,因为 round=0不满足要求。
所以每一个格子代表的是一些时间,比如1秒和25秒 都会指向0格子上,而任务则放在每个格子对应的链表中,这点和HashMap的数据有些类似。

Netty构建延时队列主要用HashedWheelTimer,HashedWheelTimer底层数据结构依然是使用DelayedQueue,只是采用时间轮的算法来实现。
下面我们用Netty 简单实现延时队列,HashedWheelTimer构造函数比较多,解释一下各参数的含义。
ThreadFactory :表示用于生成工作线程,一般采用线程池;
tickDuration和unit:每格的时间间隔,默认100ms;

ticksPerWheel:一圈下来有几格,默认512,而如果传入数值的不是2的N次方,则会调整为大于等于该参数的一个2的N次方数值,有利于优化hash值的计算。

1
2
3
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}

TimerTask:一个定时任务的实现接口,其中run方法包装了定时任务的逻辑。
Timeout:一个定时任务提交到Timer之后返回的句柄,通过这个句柄外部可以取消这个定时任务,并对定时任务的状态进行一些基本的判断。
Timer:是HashedWheelTimer实现的父接口,仅定义了如何提交定时任务和如何停止整个定时机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class NettyDelayQueue {

public static void main(String[] args) {

final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
//定时任务
TimerTask task1 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order1 5s 后执行 ");
timer.newTimeout(this, 5, TimeUnit.SECONDS);//结束时候再次注册
}
};
timer.newTimeout(task1, 5, TimeUnit.SECONDS);
TimerTask task2 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order2 10s 后执行");
timer.newTimeout(this, 10, TimeUnit.SECONDS);//结束时候再注册
}
};

timer.newTimeout(task2, 10, TimeUnit.SECONDS);

//延迟任务
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order3 15s 后执行一次");
}
}, 15, TimeUnit.SECONDS);

}
}

从执行的结果看,order3、order3延时任务只执行了一次,而order2、order1为定时任务,按照不同的周期重复执行。
order1 5s 后执行
order2 10s 后执行
order3 15s 后执行一次
order1 5s 后执行
order2 10s 后执行

7. Redisson

Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能。

先来个demo,后面再来说说这种实现的原理。

demo

引入 pom

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.1</version>
</dependency>

封装了一个RedissonDelayQueue类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Component
@Slf4j
public class RedissonDelayQueue {

private RedissonClient redissonClient;

private RDelayedQueue<String> delayQueue;
private RBlockingQueue<String> blockingQueue;

@PostConstruct
public void init() {
initDelayQueue();
startDelayQueueConsumer();
}

private void initDelayQueue() {
Config config = new Config();
SingleServerConfig serverConfig = config.useSingleServer();
serverConfig.setAddress("redis://localhost:6379");
redissonClient = Redisson.create(config);

blockingQueue = redissonClient.getBlockingQueue("SANYOU");
delayQueue = redissonClient.getDelayedQueue(blockingQueue);
}

private void startDelayQueueConsumer() {
new Thread(() -> {
while (true) {
try {
String task = blockingQueue.take();
log.info("接收到延迟任务:{}", task);
} catch (Exception e) {
e.printStackTrace();
}
}
}, "SANYOU-Consumer").start();
}

public void offerTask(String task, long seconds) {
log.info("添加延迟任务:{} 延迟时间:{}s", task, seconds);
delayQueue.offer(task, seconds, TimeUnit.SECONDS);
}

}

这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象,传入的队列名字叫SANYOU,这个名字无所谓。

当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务。

添加任务的时候是通过RDelayedQueue的offer方法添加的。

controller类,通过接口添加任务,延迟时间为5s

1
2
3
4
5
6
7
8
9
10
11
12
@RestController
public class RedissonDelayQueueController {

@Resource
private RedissonDelayQueue redissonDelayQueue;

@GetMapping("/add")
public void addTask(@RequestParam("task") String task) {
redissonDelayQueue.offerTask(task, 5);
}

}

访问接口静待5s ,获取到数据

实现原理

如下图就是上面demo中,一个延迟队列会在Redis内部使用到的channel和数据类型

SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀。

  • redisson_delay_queue_timeout:SANYOU,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要
  • redisson_delay_queue:SANYOU,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用。。
  • SANYOU,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的
  • redisson_delay_queue_channel:SANYOU,是一个channel,用来通知客户端开启一个延迟任务

有了这些概念之后,再来看看整体的运行原理图

  • 生产者在提交任务的时候将任务放到redisson_delay_queue_timeout:SANYOU中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳
  • 客户端会有一个延迟任务,为了区分,后面我都说是客户端延迟任务。这个延迟任务会向Redis Server发送一段lua脚本,Redis执行lua脚本中的命令,并且是原子性的

这段lua脚本主要干了两件事:

  • 将到了延迟时间的任务从redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU这个目标队列
  • 获取到redisson_delay_queue_timeout:SANYOU中目前最早到过期时间的延迟任务的到期时间戳,然后发布到redisson_delay_queue_channel:SANYOU这个channel中

当客户端监听到redisson_delay_queue_channel:SANYOU这个channel的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到过期时间的延迟任务的到期时间戳)- 当前时间戳,这个时间其实也就是redisson_delay_queue_channel:SANYOU中最早到过期时间的任务还剩余的延迟时间。

此处可以等待10s,好好想想。。

这样,一旦时间来到了上面说的最早到过期时间任务的到期时间戳,redisson_delay_queue_timeout:SANYOU中上面说的最早到过期时间的任务已经到期了,客户端的延迟任务也同时到期,于是开始执行lua脚本操作,及时将到了延迟时间的任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期的任务到期时间戳到channe中,如此循环往复,一直运行下去,保证redisson_delay_queue_timeout:SANYOU中到期的数据能及时放到目标队列中。

所以,上述说了一大堆的主要的作用就是保证到了延迟时间的任务能够及时被放到目标队列。

这里再补充两个特殊情况,图中没有画出:

第一个就是如果redisson_delay_queue_timeout:SANYOU是新添加的任务(队列之前有或者没有任务)是队列中最早需要被执行的,也会发布消息到channel,之后就按时上面说的流程走了。

添加任务代码如下,也是通过lua脚本来的

第二种特殊情况就是项目启动的时候会执行一次客户端延迟任务。项目在重启时,由于没有客户端延迟任务的执行,可能会出现redisson_delay_queue_timeout:SANYOU队列中有到期但是没有被放到目标队列的可能,重启就执行一次就是为了保证到期的数据能被及时放到目标队列中。

Redisson方案理论上是没有延迟的,但是当消息数量增加,消费者消费缓慢这个情况下可能会导致延迟任务消费的延迟。

丢消息的问题,Redisson方案很大程度上减轻了丢消息的可能性,因为所有的任务都是存在list和sorted set两种数据类型中,Redis有持久化机制,就算Redis宕机了,也就可能会丢一点点数据。

广播消费任务的问题,这个是不会出现的,因为每个客户端都是从同一个目标队列中获取任务的。

综上,Redisson 这种实现方案是比较合适的

参考链接