侧边栏壁纸
博主头像
laasc

Coding changes the world

  • 累计撰写 7 篇文章
  • 累计创建 9 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

延时任务处理订单失效【RabbitMQ死信队列实现】

laasc
2022-03-30 / 0 评论 / 8 点赞 / 922 阅读 / 2,664 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2023-08-07,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

订单失效,自动关单 RabbitMQ实现

最近在做电商项目部分功能重构,在重构时遇到一个关于超时订单关单的问题,在这里记录总结下~

问题:订单超时未支付或者订单失效怎么去关闭订单

超时订单处理比较麻烦的地方就是实时的获取订单状态

对于上述问题一般有两种处理方案:1.定时任务 2.延时任务

定时任务和延时任务的区别在哪

一般的区别有一下几点:

  • 定时任务有明确的触发时间,延时任务没有
  • 定时任务有执行周期,而延时任务是由某个事物触发,没有执行周期
  • 定时任务一般执行的是批处理操作是多个任务,而延时任务一般是单个任务

定时任务处理订单失效的问题

用户下订单后生成订单信息,然后再把订单加入到定时任务中(设置xxx时间后执行),当到达指定时间后检查订单状态,如果该订单未支付则把订单状态标识为失效。

延时任务处理订单失效的问题

当用户下订单后,将用户的订单的标识全部发送到延时队列中,30分钟后进去消费队列中被消费,消费时先检查该订单的状态,如果未支付则标识该订单失效。

有以下几种延时任务处理方法:

  • Java自带的DelayedQueue队列
    这是JDK自带一个无阻塞队列,项目业务不复杂可以考虑这种方式。它是使用jvm内存来实现的,停机会丢失数据,扩展性不高

  • 使用redis监听key过期
    当用户下订单后把订单信息设置为redis的key,设置过期时间,程序编写监听redis的key失效,然后处理订单(最开始我们这就是采用的这种技术方案)。这种方式最大的弊端就是只能监听一台redis的key失效,集群下将无法实现,当初做技术讨论的时候有说过监听集群下的每个redis节点的key,但我认为这样做很不合适,复杂都比较高,难维护。如果项目业务复杂性不高,redis单机部署,就可以考虑这种方式。

  • 使用MQ的方式,这里我们讨论RabbitMQ里的死信队列实现方案。

RabbitMQ死信队列实现监听订单失效

AMQP协议和RabbitMQ本身是不支持延时队列功能,但我们能模拟出延时消息的功能。(可以大概看下我画的草图)

c2d7e5335fe3224d3fe1350875ed3d4.png

  • Time To Live(TTL)
    RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
B: 对消息进行单独设置,每条消息TTL可以不同。

  • Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

为了验证这一效果,我们下面去做个例子去模拟实现订单超时自动触发关单操作,为了效果,我这里把过期时间改成了15s

具体实现

开发环境/版本一览

  • 开发工具:Intellij IDEA 2021.3.3
  • springboot:2.3.3
  • jdk:11.0.14
  • maven: 3.5.4
  • RabbitMQ:3.8.14

docker安装RabbitMQ我后面会再出一个文章讨论下

项目结构

c95e8677872722974257a6ad186d62b.png

项目搭建

  • 生成项目工程,配置pom.xml加入依赖 我这里idea自动添加了lombok,我们就多引入一下
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
</dependency>
  • 配置文件 application.yml 加入RabbitMQ的相关配置
server:
  port: 3000

spring:
  rabbitmq:
    host: ip
    port: 5672
    virtual-host: /
    password: 123456
    username: admin
    #开启手动确认消费消息
    listener:
      simple:
        acknowledge-mode: manual
  • 然后需要配置RabbitMQ的队列、交换机和路由key的信息 创建RabbitConfig
package top.myfanfou.rabbitmqdelayqueue.config;

import lombok.Data;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Exchange;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author: laasc
 * @Date: 2022/3/30 15:02
 * @Description: RabbitMQ配置信息
 * @Version: 1.0
 */
@Configuration
@Data
public class RabbitMQConfig {
  /**
   * 交换机名称
   */
  public static final String eventExchange = "order_event_exchange";

  /**
   * 死信交换机名称
   */
  public static final String dealEventExchange = "order_event_exchange";

  /**
   * 死信队列名称
   */
  public static final String dealQueueOrder = "deal_queue_order";

  /**
   * 延迟队列名称
   */
  public static final String orderDelayQueue = "order_delay_queue";

  /**
   * 进入延迟队列路由key
   */
  public static final String orderDelayQueueRoutingKey = "order_delay_queue_routing_key";

  /**
   * 进入到死信队列路由key
   */
  public static final String dealQueueOrderRoutingKey = "deal_queue_order_routing_key";

  /**
   * 死信队列 交换机标识符
   */
  public final static String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";

  /**
   * 死信队列 路由key
   */
  public final static String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

  @Autowired
  private CachingConnectionFactory connectionFactory;


  /**
   * 声明交换机 Topic类型 也可使用dirct路由
   * @return
   */
  @Bean
   public Exchange orderEventExchange() {
    return  new TopicExchange(eventExchange, true, false);
  }


  /**
   * 消息转换器
   * @return
   */
  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  /**
   * 创建延迟队列
   * @return
   */
  @Bean
  public Queue orderDelayQueue() {
    Map<String, Object> args = new HashMap<>(3);
    // args.put("x-message-ttl", 15 * 1000); // 直接设置延迟队列进入死信队列时间
    args.put(DEAD_LETTER_ROUTING_KEY, dealQueueOrderRoutingKey);
    args.put(DEAD_LETTER_QUEUE_KEY, dealEventExchange);
    return new Queue(orderDelayQueue, true, false,false, args);
  }


  /**
   * 创建死信队列
   * @return
   */
  @Bean
  public Queue dealQueueOrder() {
    return new Queue(dealQueueOrder, true, false,false);
  }

  /**
   * 死信队列绑定交换机路由
   * @return
   */
  @Bean
  public Binding dealQueueOrderBinding() {
    return new Binding(dealQueueOrder, Binding.DestinationType.QUEUE, eventExchange,dealQueueOrderRoutingKey, null);
  }

  /**
   * 延迟队列绑定交换机路由
   * @return
   */
  @Bean
  public Binding orderDelayQueueBinding() {
    return new Binding(orderDelayQueue, Binding.DestinationType.QUEUE, dealEventExchange,orderDelayQueueRoutingKey, null);
  }
}

  • 配置消息生产者
package top.myfanfou.rabbitmqdelayqueue.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import top.myfanfou.rabbitmqdelayqueue.config.RabbitMQConfig;
import top.myfanfou.rabbitmqdelayqueue.model.OrderMessage;

import java.util.UUID;

/**
 * @Author: laasc
 * @Date: 2022/3/30 16:45
 * @Description: 订单接口控制器
 * @Version: 1.0
 */

@RestController
@RequestMapping("/api/order/v1")
@Slf4j
public class OrderController {

  @Autowired
  private RabbitTemplate rabbitTemplate;

  @Autowired
  private RabbitMQConfig rabbitMqConfig;


  /**
   * 摸拟订单提交
   * @return
   */
  @GetMapping("test/submit_order")
  public Object submit() {

    String orderId = UUID.randomUUID().toString();
    OrderMessage orderMessage = new OrderMessage();
    orderMessage.setOutTradeNo(orderId);
    log.info("发送到mq的订单号=====》: {}", orderId);

    /**
     * 1.发送到交换机的名称
     * 2.消息对象
     * 3.配置过期时间
     */
    rabbitTemplate.convertAndSend(RabbitMQConfig.eventExchange, RabbitMQConfig.orderDelayQueueRoutingKey, orderMessage, message -> {
      // 如果配置了 params.put("x-message-ttl", 5 * 1000); 这一句可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
      message.getMessageProperties().setExpiration(1000 * 15 + "");
      return message;
    });

    return  "{'order_id': '"+ orderId +"'}";
  }

}

  • 消息实体类封装
package top.myfanfou.rabbitmqdelayqueue.model;

/**
 * @Author: laasc
 * @Date: 2022/3/30 16:54
 * @Description: mq订单消息类 可以定义想要的属性
 */
public class OrderMessage {

  public String getOutTradeNo() {
    return outTradeNo;
  }

  public void setOutTradeNo(String outTradeNo) {
    this.outTradeNo = outTradeNo;
  }

  /**
   * 订单号
   */
  private String outTradeNo;

}
  • 配置消息消费者
package top.myfanfou.rabbitmqdelayqueue.listener;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import top.myfanfou.rabbitmqdelayqueue.config.RabbitMQConfig;
import top.myfanfou.rabbitmqdelayqueue.model.OrderMessage;

import java.io.IOException;

/**
 * @Author: laasc
 * @Date: 2022/3/30 17:18
 * @Description: mq消费监听器
 */
@Component
@Slf4j
@RabbitListener(queues = RabbitMQConfig.dealQueueOrder)
public class OrderFailureListener {

  /**
   * 延时关单监听器
   * @param orderMessage
   * @param message
   * @param channel
   */
  @RabbitHandler
  public void closeOrder(OrderMessage orderMessage, Message message, Channel channel) throws IOException {
    log.info("监听到延时消息订单号========》:{}", orderMessage.getOutTradeNo());

    // 手动确认消息
    long msgTag = message.getMessageProperties().getDeliveryTag();

    try {
      // 判断是否支付,如果支付取消关单,未支付关闭订单 这里可以调用其他接口完成订单的状态确认,我这里直接就写未完成订单 TODO
      boolean flag = true;
      if(flag) {
        // 确认消费
        channel.basicAck(msgTag, false);
        log.info("延时订单消费成功");
      }else {
        // 重新投递
        channel.basicReject(msgTag, true);
      }

    }catch (IOException e) {
      log.error("延时关单异常:{}", orderMessage);
      channel.basicReject(msgTag, true);
    }

  }

}

  • 配置启动类
package top.myfanfou.rabbitmqdelayqueue;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqDelayQueueApplication {

  public static void main(String[] args) {
    SpringApplication.run(RabbitmqDelayQueueApplication.class, args);
  }

}

启动测试

启动成功后我们访问一下配置的接口模拟提交订单接口:http://localhost:3000/api/order/v1/test/submit_order

b2f0706889b613486310b6d438d16fb.png

15s后查看控制台
bff97ba9784225d3463c56fe7084d84.png

芜湖芜湖~起飞?️了,大功告成,可以看出来我们代码只需要关注逻辑方面,只需要确保消息能正常投递到mq,然后能监听到mq的消费队列,剩下全部都是mq完成的。

可以说实时性已经非常高了,可以试着多提交几次订单测试数据的流转,如果觉得时间过快,可以把时间稍微设置的长一点。

我们打开RabbitMQ的ui界面,可以看到我们多出两条队列。我这里之前做测试使用,队列有点多,可能看的不是很明白,如果是新装的mq,那这里只有两条记录。

d1f840a9b5d88e6928e5c8d6428d4d0.png

我们使用这种方式,不止是可以做任务失效处理,比如新人注册优惠券的发放、优惠券的过期等等问题,然后可以集群部署RabbitMQ,做消息确认机制,消息的消费模式也有其他模式配置,如果有兴趣可以看下官方文档。

项目完整代码

8

评论区