订单失效,自动关单 RabbitMQ实现
最近在做电商项目部分功能重构,在重构时遇到一个关于超时订单关单的问题,在这里记录总结下~
问题:订单超时未支付或者订单失效怎么去关闭订单
超时订单处理比较麻烦的地方就是实时的获取订单状态
对于上述问题一般有两种处理方案:1.定时任务 2.延时任务
定时任务和延时任务的区别在哪
一般的区别有一下几点:
- 定时任务有明确的触发时间,延时任务没有
- 定时任务有执行周期,而延时任务是由某个事物触发,没有执行周期
- 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务
定时任务处理订单失效的问题
用户下订单后生成订单信息,然后再把订单加入到定时任务中(设置xxx时间后执行),当到达指定时间后检查订单状态,如果该订单未支付则把订单状态标识为失效。
延时任务处理订单失效的问题
当用户下订单后,将用户的订单的标识全部发送到延时队列中,30分钟后进去消费队列中被消费,消费时先检查该订单的状态,如果未支付则标识该订单失效。
有以下几种延时任务处理方法:
-
Java自带的DelayedQueue队列
这是JDK自带一个无阻塞队列,项目业务不复杂可以考虑这种方式。它是使用jvm内存来实现的,停机会丢失数据,扩展性不高 -
使用redis监听key过期
当用户下订单后把订单信息设置为redis的key,设置过期时间,程序编写监听redis的key失效,然后处理订单(最开始我们这就是采用的这种技术方案)。这种方式最大的弊端就是只能监听一台redis的key失效,集群下将无法实现,当初做技术讨论的时候有说过监听集群下的每个redis节点的key,但我认为这样做很不合适,复杂都比较高,难维护。如果项目业务复杂性不高,redis单机部署,就可以考虑这种方式。 -
使用MQ的方式,这里我们讨论RabbitMQ里的死信队列实现方案。
RabbitMQ死信队列实现监听订单失效
AMQP协议和RabbitMQ本身是不支持延时队列功能,但我们能模拟出延时消息的功能。(可以大概看下我画的草图)
- Time To Live(TTL)
RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
B: 对消息进行单独设置,每条消息TTL可以不同。
- Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
为了验证这一效果,我们下面去做个例子去模拟实现订单超时自动触发关单操作,为了效果,我这里把过期时间改成了15s
具体实现
开发环境/版本一览
- 开发工具:Intellij IDEA 2021.3.3
- springboot:2.3.3
- jdk:11.0.14
- maven: 3.5.4
- RabbitMQ:3.8.14
docker安装RabbitMQ我后面会再出一个文章讨论下
项目结构
项目搭建
- 生成项目工程,配置pom.xml加入依赖 我这里idea自动添加了lombok,我们就多引入一下
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
- 配置文件 application.yml 加入RabbitMQ的相关配置
server:
port: 3000
spring:
rabbitmq:
host: ip
port: 5672
virtual-host: /
password: 123456
username: admin
#开启手动确认消费消息
listener:
simple:
acknowledge-mode: manual
- 然后需要配置RabbitMQ的队列、交换机和路由key的信息 创建RabbitConfig
package top.myfanfou.rabbitmqdelayqueue.config;
import lombok.Data;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Exchange;
import java.util.HashMap;
import java.util.Map;
/**
* @Author: laasc
* @Date: 2022/3/30 15:02
* @Description: RabbitMQ配置信息
* @Version: 1.0
*/
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交换机名称
*/
public static final String eventExchange = "order_event_exchange";
/**
* 死信交换机名称
*/
public static final String dealEventExchange = "order_event_exchange";
/**
* 死信队列名称
*/
public static final String dealQueueOrder = "deal_queue_order";
/**
* 延迟队列名称
*/
public static final String orderDelayQueue = "order_delay_queue";
/**
* 进入延迟队列路由key
*/
public static final String orderDelayQueueRoutingKey = "order_delay_queue_routing_key";
/**
* 进入到死信队列路由key
*/
public static final String dealQueueOrderRoutingKey = "deal_queue_order_routing_key";
/**
* 死信队列 交换机标识符
*/
public final static String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列 路由key
*/
public final static String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 声明交换机 Topic类型 也可使用dirct路由
* @return
*/
@Bean
public Exchange orderEventExchange() {
return new TopicExchange(eventExchange, true, false);
}
/**
* 消息转换器
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 创建延迟队列
* @return
*/
@Bean
public Queue orderDelayQueue() {
Map<String, Object> args = new HashMap<>(3);
// args.put("x-message-ttl", 15 * 1000); // 直接设置延迟队列进入死信队列时间
args.put(DEAD_LETTER_ROUTING_KEY, dealQueueOrderRoutingKey);
args.put(DEAD_LETTER_QUEUE_KEY, dealEventExchange);
return new Queue(orderDelayQueue, true, false,false, args);
}
/**
* 创建死信队列
* @return
*/
@Bean
public Queue dealQueueOrder() {
return new Queue(dealQueueOrder, true, false,false);
}
/**
* 死信队列绑定交换机路由
* @return
*/
@Bean
public Binding dealQueueOrderBinding() {
return new Binding(dealQueueOrder, Binding.DestinationType.QUEUE, eventExchange,dealQueueOrderRoutingKey, null);
}
/**
* 延迟队列绑定交换机路由
* @return
*/
@Bean
public Binding orderDelayQueueBinding() {
return new Binding(orderDelayQueue, Binding.DestinationType.QUEUE, dealEventExchange,orderDelayQueueRoutingKey, null);
}
}
- 配置消息生产者
package top.myfanfou.rabbitmqdelayqueue.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import top.myfanfou.rabbitmqdelayqueue.config.RabbitMQConfig;
import top.myfanfou.rabbitmqdelayqueue.model.OrderMessage;
import java.util.UUID;
/**
* @Author: laasc
* @Date: 2022/3/30 16:45
* @Description: 订单接口控制器
* @Version: 1.0
*/
@RestController
@RequestMapping("/api/order/v1")
@Slf4j
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitMQConfig rabbitMqConfig;
/**
* 摸拟订单提交
* @return
*/
@GetMapping("test/submit_order")
public Object submit() {
String orderId = UUID.randomUUID().toString();
OrderMessage orderMessage = new OrderMessage();
orderMessage.setOutTradeNo(orderId);
log.info("发送到mq的订单号=====》: {}", orderId);
/**
* 1.发送到交换机的名称
* 2.消息对象
* 3.配置过期时间
*/
rabbitTemplate.convertAndSend(RabbitMQConfig.eventExchange, RabbitMQConfig.orderDelayQueueRoutingKey, orderMessage, message -> {
// 如果配置了 params.put("x-message-ttl", 5 * 1000); 这一句可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
message.getMessageProperties().setExpiration(1000 * 15 + "");
return message;
});
return "{'order_id': '"+ orderId +"'}";
}
}
- 消息实体类封装
package top.myfanfou.rabbitmqdelayqueue.model;
/**
* @Author: laasc
* @Date: 2022/3/30 16:54
* @Description: mq订单消息类 可以定义想要的属性
*/
public class OrderMessage {
public String getOutTradeNo() {
return outTradeNo;
}
public void setOutTradeNo(String outTradeNo) {
this.outTradeNo = outTradeNo;
}
/**
* 订单号
*/
private String outTradeNo;
}
- 配置消息消费者
package top.myfanfou.rabbitmqdelayqueue.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import top.myfanfou.rabbitmqdelayqueue.config.RabbitMQConfig;
import top.myfanfou.rabbitmqdelayqueue.model.OrderMessage;
import java.io.IOException;
/**
* @Author: laasc
* @Date: 2022/3/30 17:18
* @Description: mq消费监听器
*/
@Component
@Slf4j
@RabbitListener(queues = RabbitMQConfig.dealQueueOrder)
public class OrderFailureListener {
/**
* 延时关单监听器
* @param orderMessage
* @param message
* @param channel
*/
@RabbitHandler
public void closeOrder(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
log.info("监听到延时消息订单号========》:{}", orderMessage.getOutTradeNo());
// 手动确认消息
long msgTag = message.getMessageProperties().getDeliveryTag();
try {
// 判断是否支付,如果支付取消关单,未支付关闭订单 这里可以调用其他接口完成订单的状态确认,我这里直接就写未完成订单 TODO
boolean flag = true;
if(flag) {
// 确认消费
channel.basicAck(msgTag, false);
log.info("延时订单消费成功");
}else {
// 重新投递
channel.basicReject(msgTag, true);
}
}catch (IOException e) {
log.error("延时关单异常:{}", orderMessage);
channel.basicReject(msgTag, true);
}
}
}
- 配置启动类
package top.myfanfou.rabbitmqdelayqueue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqDelayQueueApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqDelayQueueApplication.class, args);
}
}
启动测试
启动成功后我们访问一下配置的接口模拟提交订单接口:http://localhost:3000/api/order/v1/test/submit_order
15s后查看控制台
芜湖芜湖~起飞?️了,大功告成,可以看出来我们代码只需要关注逻辑方面,只需要确保消息能正常投递到mq,然后能监听到mq的消费队列,剩下全部都是mq完成的。
可以说实时性已经非常高了,可以试着多提交几次订单测试数据的流转,如果觉得时间过快,可以把时间稍微设置的长一点。
我们打开RabbitMQ的ui界面,可以看到我们多出两条队列。我这里之前做测试使用,队列有点多,可能看的不是很明白,如果是新装的mq,那这里只有两条记录。
我们使用这种方式,不止是可以做任务失效处理,比如新人注册优惠券的发放、优惠券的过期等等问题,然后可以集群部署RabbitMQ,做消息确认机制,消息的消费模式也有其他模式配置,如果有兴趣可以看下官方文档。
项目完整代码
- 完整的代码demo已经推送到github代码地址https://github.com/laasc/rabbitmq-delay-queue.git。
评论区