文章目录

1.业务场景2.定时任务(Quartz)2.1.依赖导入2.2.任务类2.3.任务调度类2.4.小结

3.延迟队列(DelayQueue)3.1.任务类3.2.测试案例3.3.日志输出3.4.小结

4.时间轮算法4.1.依赖导入4.2.任务类4.3.测试案例4.4.日志输出4.5.小结

5.Redis5.1.修改配置5.2.导入依赖5.3.测试案例5.4.日志输出5.5.小结

6.消息队列(RocketMQ)6.1.延迟级别6.2.生产者代码6.3.消费者代码6.4.小结

1.业务场景

在实际业务场景中,我们经常会碰到类似一下场景:

淘宝等购物平台在订单支付时,如果30分钟内未支付自动取消。腾讯会议预约会议后,在会议开始前15分钟提醒。未使用的优惠券有效期结束后,自动将优惠券状态更新为已过期。等等。。。

像这种支付超时取消的场景需求,其实有很多种实现方法,比如定时任务轮询、Java中的延时队列、时间轮算法、Redis过期监听等,如下图所示。

2.定时任务(Quartz)

Java中常见的定时任务框架包括 Quartz、Spring Task、Elastic-Job、XXL-Job等。下面将以 Quartz 为例实现业务场景(有关Elastic-Job 的使用可见 Elastic Job 开发使用篇)。

2.1.依赖导入

org.quartz-scheduler

quartz

2.3.2

2.2.任务类

@Slf4j

public class PaymentJob implements Job {

@Override

public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {

log.info("查询数据库获取超时支付订单,并取消该订单");

}

}

2.3.任务调度类

public class RoundRobin {

private static Scheduler defaultScheduler;

public void timedTask() {

// 创建任务明细

JobDetail jobDetail = JobBuilder.newJob(PaymentJob.class)

.withIdentity("支付超时取消订单任务", "payment_timeout_group")

.build();

// 创建触发器

Trigger trigger = TriggerBuilder.newTrigger()

.withDescription("这是支付超时取消订单任务触发器")

.startNow()

// 设置任务执行调度周期:cron表达式,每3秒执行一次

.withSchedule(CronScheduleBuilder.cronSchedule("0/3 * * * * ?"))

.build();

// 创建scheduler调度器

try {

if (defaultScheduler == null) {

synchronized (this) {

if (defaultScheduler == null) {

defaultScheduler = StdSchedulerFactory.getDefaultScheduler();

}

}

}

// 执行任务

defaultScheduler.scheduleJob(jobDetail, trigger);

defaultScheduler.start();

} catch (SchedulerException e) {

throw new RuntimeException(e);

}

}

}

2.4.小结

定时任务轮询的方式简单易行,但是这种方式也存在着显著的局限性:

1.在支付订单数量庞大的情况下,每次获取超时订单会走全表扫描,给数据库带来很大的IO负担和CPU占用,特别是这种需要小时间间隔任务轮询的全表扫描。其实这种也有不走全表扫描的方法,牺牲空间,就是对订单创建时间建立索引,设过期时间为 当前时间 - 30分钟(假设是超时时间),走索引查询过期时间之前的所有订单,最后执行取消订单的操作。

2.精度问题。如果将定时任务的时间间隔设置的比较长,会导致超时订单取消延迟较长,影响业务流程。如果间隔时间过于短,在大量订单的情况下,可能会出现大量重复订单,需要考虑并发问题和事务冲突。

3.延迟队列(DelayQueue)

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。当生产者线程调用插入元素的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间越晚。

3.1.任务类

@Slf4j

public class OrderDelay implements Delayed {

// 订单id

private String orderId;

// 超时的最后时刻(单位毫秒)

private long timeout;

public OrderDelay(String orderId, long timeout) {

this.orderId = orderId;

this.timeout = timeout+System.currentTimeMillis();

}

// 返回距离超时还剩多少毫秒

@Override

public long getDelay(TimeUnit unit) {

return unit.convert(timeout - System.currentTimeMillis(), TimeUnit.MILLISECONDS);

}

// 和其他的订单比较时间

@Override

public int compareTo(Delayed o) {

if (o == this) {

return 0;

} else {

OrderDelay t = (OrderDelay) o;

long l = this.timeout - t.timeout;

return l == 0 ? 0 : (l > 0 ? 1 : -1);

}

}

// 超时取消处理

public void timeoutCancel(){

log.info("订单{}超时,处理完毕",orderId);

}

}

3.2.测试案例

public class CancelTimeoutOrder {

public static void main(String[] args) {

// 先创建3个订单

OrderDelay o1 = new OrderDelay("1", 2 * 1000);

OrderDelay o2 = new OrderDelay("2", 4 * 1000);

OrderDelay o3 = new OrderDelay("3", 6 * 1000);

// 创建延迟队列

DelayQueue delayQueue = new DelayQueue<>();

delayQueue.put(o1);

delayQueue.put(o2);

delayQueue.put(o3);

// 开始判断订单

while (true){

try {

OrderDelay take = (OrderDelay) delayQueue.take();

take.timeoutCancel();

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

}

}

}

3.3.日志输出

19:20:36.591 [main] INFO com.payment.demo.delay.OrderDelay - 订单1超时,处理完毕

19:20:38.588 [main] INFO com.payment.demo.delay.OrderDelay - 订单2超时,处理完毕

19:20:40.588 [main] INFO com.payment.demo.delay.OrderDelay - 订单3超时,处理完毕

3.4.小结

这种方式弥补了精度问题,并且任务处理更加高效,也不需要考虑多线程并发性的问题。但是所有订单都需要保留在内存,在大量订单的情况下会有很大的内存消耗,如果此时系统重启或者崩溃,那么剩余未处理的订单将会丢失。

4.时间轮算法

时间轮算法(Time Wheel Algorithm)是一种用于处理定时任务调度的算法,它使用循环数组和指针来实现,在每个时刻都有一个指针指向当前时间槽,每个时间槽中保存了需要执行的任务列表。时间轮算法的核心是轮询线程不再负责遍历所有任务,而是仅仅遍历时间刻度。

时间轮算法主要原理如下:

时间轮的构造:时间轮由多个槽(slot)组成,每个槽表示一个时间间隔。整个时间轮可以看作是一个环状结构,每个槽都有一个索引来标识。

时间轮的转动:时间轮按照固定的速度不断地转动,每次转动一个槽的间隔(例如,每秒转动一次)。

任务插入:当需要添加一个延迟任务时,根据任务的延迟时间,计算应该插入到哪个槽中。任务会被插入到离当前时间一定间隔的槽中。

任务触发:时间轮的每次转动都会检查当前位置的槽是否有任务,如果有,就执行任务。

时间轮的级联:如果有多个时间轮,可以将多个时间轮级联,即把一个时间轮的一个槽作为下一个时间轮的一个槽。这样可以扩展时间轮的范围和精度。

任务的删除:当延迟任务被取消或者执行完成时,需要从时间轮中删除对应的任务。

时间轮算法在实际应用中有很多用途,比如网络延迟调度、定时任务调度、消息队列等。通过合理地调整时间轮的大小和刻度,可以实现高效的任务调度和处理。

4.1.依赖导入

io.netty

netty-common

4.1.94.Final

4.2.任务类

@Slf4j

public class OrderTask implements TimerTask {

// 订单id

private String orderId;

public OrderTask(String orderId) {

this.orderId = orderId;

}

// 任务执行方法

@Override

public void run(Timeout timeout) throws Exception {

log.info("订单{}超时,处理完毕",orderId);

}

}

4.3.测试案例

public class TimeWheelUtil {

public static void main(String[] args) {

// 创建任务

OrderTask o1 = new OrderTask("1");

OrderTask o2 = new OrderTask("2");

OrderTask o3 = new OrderTask("3");

// 时间轮算法实现类

HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();

// 添加任务

hashedWheelTimer.newTimeout(o1,2, TimeUnit.SECONDS);

hashedWheelTimer.newTimeout(o2,4, TimeUnit.SECONDS);

hashedWheelTimer.newTimeout(o3,6, TimeUnit.SECONDS);

}

}

4.4.日志输出

20:04:17.611 [pool-1-thread-1] INFO com.payment.demo.wheel.OrderTask - 订单1超时,处理完毕

20:04:19.610 [pool-1-thread-1] INFO com.payment.demo.wheel.OrderTask - 订单2超时,处理完毕

20:04:21.612 [pool-1-thread-1] INFO com.payment.demo.wheel.OrderTask - 订单3超时,处理完毕

4.5.小结

时间轮算法其实和延迟队列比较相似。与延迟队列相比,其性能更优越,任务触发时间延迟时间更低,代码复杂度更简单。同样,由于信息存储于内存中,所以容易因为系统重启或宕机而丢失订单信息。

5.Redis

我们都知道 Redis 中的 key 可以设置过期时间,显而易见,通过设置过期时间然后监听这个 key 是否过期就能判断支付订单是否超时了。而Redis本身就具备key过期监听功能,即利用 Redis 的Keyspace Notifications功能,当一个 key 过期时,Redis 会向已订阅了相关 channel 的客户端发送一个通知。

5.1.修改配置

首先我们需要打开 redis.conf 文件,开启Keyspace Notifications功能,即修改如下配置。

notify-keyspace-events Ex

如图所示。

随后启动 redis 服务端。

5.2.导入依赖

redis.clients

jedis

3.3.0

5.3.测试案例

@Slf4j

public class RedisKeyNotify {

static JedisPool jedisPool = null;

public static void main(String args[]) throws InterruptedException {

new Thread(() -> {

// 配置redis连接

jedisPool = new JedisPool("localhost", 6379);

// 订阅redis的key过期通知

jedisPool.getResource().subscribe(new RedisSub(),"__keyevent@0__:expired");

}).start();

// 等待jedis初始化完

TimeUnit.SECONDS.sleep(1);

// 模拟一些数据

jedisPool.getResource().setex("1",3,"1");

jedisPool.getResource().setex("2",6,"2");

}

static class RedisSub extends JedisPubSub {

@Override

public void onMessage(String channel, String message) {

log.info("订单{}超时,处理完毕",message);

}

}

}

5.4.日志输出

23:50:57.500 [Thread-0] INFO com.payment.demo.redis.RedisKeyNotify - 订单1超时,处理完毕

23:51:00.382 [Thread-0] INFO com.payment.demo.redis.RedisKeyNotify - 订单2超时,处理完毕

5.5.小结

Redis的键过期事件处理机制天然支持高并发场景,只要Redis集群足够强大,可以轻松处理大量订单的过期处理。但是这种方式有一个很严重的弊端,在官方网站中有如下提醒:

Note: Redis Pub/Sub is fire and forget that is, if your Pub/Sub client disconnects, and reconnects later, all the events delivered during the time the client was disconnected are lost.

注意:Redis 的发布/订阅目前是即发即弃(fire and forget)模式的,也就是说,如果您的Pub/Sub客户端断开连接,稍后再重新连接,则客户端断开时传递的所有事件都将丢失。因此无法实现事件的可靠通知。

6.消息队列(RocketMQ)

延迟队列可以直接处理延迟消息,即消息在指定的延迟时间过后才被投递给消费者。在支付超时取消订单的场景中,订单创建时将订单信息封装成消息,并设置消息的延迟时间,当订单超时时,消息自动被投递到处理超时订单的队列,消费者接收到消息后执行取消操作。

以 RocketMQ 为例,在 RocketMQ 中没有延迟队列这一概念,但是我们可以通过延迟消息(Delayed Message)实现这一功能。有关RocketMQ的安装部署请移步 《RocketMQ安装部署+简单实战开发》

6.1.延迟级别

RocketMQ 一共支持18个等级的延时投递。

投递等级(delay level)延迟时间投递等级(delay level)延迟时间11s106min25s117min310s128min430s139min51min1410min62min1520min73min1630min84min171h95min182h

6.2.生产者代码

延时消息的实现逻辑需要先经过定时存储等待触发,延时时间到达后才会被投递给消费者。因此,如果将大量延时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度。

@Component

@Slf4j

public class DelayMsgSend {

/**

* 导入RocketMQ模版工具

*/

@Resource

private RocketMQTemplate rocketMQTemplate;

/**

* 发送延迟消息

*

* @param topic 主题

* @param msg 消息内容 (本次案例为支付订单id)

* @param timeout 超时时间(单位:毫秒)

* @param delayLevel 延迟级别

*/

public void sendDelayMsg(String topic, String msg, int timeout, int delayLevel) {

// 创建消息载体

Message build = MessageBuilder.withPayload(msg).build();

// 同步发送(也可以选择异步发送)

SendResult sendResult = rocketMQTemplate.syncSend(topic, build, timeout, delayLevel);

log.info("延迟消息发送成功。发送结果:{}",sendResult);

}

}

6.3.消费者代码

@Slf4j

@Component

@RocketMQMessageListener(

topic = "delay_topic",

consumerGroup = "order_consumer_group",

selectorType = SelectorType.TAG,

messageModel = MessageModel.CLUSTERING

)

public class DelayMsgConsumer implements RocketMQListener {

@Override

public void onMessage(String message) {

log.info("接收到订单id[{}]。判断是否超时,并执行相关逻辑",message);

}

}

6.4.小结

优点

订单创建、消息发送、支付取消等业务功能都是独立的,有利于系统的模块化和拓展。RocketMQ采用了多种机制保证消息的可靠性传输,如同步刷盘、主从复制等。这意味着一旦消息发送成功,将会被可靠地传输到消息队列中,不易丢失。RocketMQ具备高吞吐量的特点,能够处理大量的消息,并且能动态随订单量调整消费速度。 缺点

由于引入了消息中间件,所以会涉及到消息中间件配置和管理,增加了系统的复杂性。高度依赖消息中间件的可用性和稳定性。仍有小概率会丢失信息,这个也是不可避免的,任何方式都没有绝对保证。