谷粒商城-高级-75 -商城业务-订单服务-RabbitMQ 延时队列
一、RabbitMQ延时队列
RabbitMQ延时队列实现定时任务。
场景:
比如未付款订单,超过一定时间后,系统自动取消订单并释放占有的库存。
常用解决方案:
spring的schedule定时任务轮训数据库
缺点:
消耗系统内存,增加了数据库的压力、存在较大的时间误差
解决:
Rabbit的消息 TTL 和私信Exchange结合。
消息的TTL
消息的TTL(Time To Live)就是消息的存活时间,单位是毫秒。。
RabbitMQ 可以对队列和消息分别设置TTL。
- 对队列设置就是队列没有消费者连着的保留时间,
也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就是死了,称之为死信
。 - 如果队列设置了,消息也设置了,那么
会取小
的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration 字段或者 x-message-ttl 属性来设置时间
,两者是一样的效果。
注意:延时消息放入到队列中,没有被任何消费者监听,如果监听就拿到了,也就被消费了,队列里边的消息只要一过设置的过期时间,就成了死信队列,服务器就会丢弃。
那么,如何设置这个TTL值呢?有两种方式,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
这样所有被投递到该队列的消息都最多不会存活超过6s。
另一种方式便是针对每条消息设置TTL,代码如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
这样这条消息的过期时间也被设置成了6s。
但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
死信
死信:Dead Letter Exchange(DLX)
一个消息在满足如下条件,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
- 一个消息被Consumer拒收了,并且reject方法的参数里 requeue 是false。也就是说不会被放在队列里,被其他消费者使用。(basic.reject/basic.nack) requeue=false
- 上面的消息的TTL到了,消息过期了。
- 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由。
Dead Letter Exchange 其实就是一种普通的 exchange,和创建其他exchange一样。只是在某一个设置Dead Letter Exchange 的队列中有信息过期了,会自动触发消息的转发,发送到 Dead Letter Exhange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列。
二、实战
延时关单
场景:用户下单,过了30分钟没有支付,系统会默认关闭该订单,以前可以用定时任务做,现在使用延时队列。
规范设计
设计建议规范(基于事件模型的交换机设计):
1、交换机命名:业务+exchange;交换机为Topic
2、路由键:事件.需要感知的业务(可以不写)
3、队列命名:事件+想要监听服务名+queue
4、绑定关系:事件.感知的业务(#)
整体业务设计:
按照上边的规范设计,对关单业务进行升级设计:
上图说明:交换机 order-event-exchange
绑定了一个延时队列order.delay.queue
,路由key是 order.create.order
, 当创建了一个订单时,会发消息到该延时队列,等到TTL过期,变为死信,会自动触发消息的转发,发送到 Dead Letter Exhange(order-event-exchange) 中去,注意死信路由是 order.release.order
,然后exchange根据路由key order.release.order
转发消息到 order.release.order.queue
队列,客户端监听该队列获取消息。
根据上图的业务设计分析,需要创建两个队列,一个交换机,和两个绑定。gulimall-order/xxx/order/config/MyMQConfig.java
package com.atguigu.gulimall.order.config;
import com.atguigu.gulimall.order.entity.OrderEntity;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.HashMap;
/**
* @author: kaiyi
* @create: 2020-09-16 13:53
*/
@Configuration
public class MyMQConfig {
/* 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下 */
/**
* 客户端监听队列(测试)
* @param orderEntity
* @param channel
* @param message
* @throws IOException
*/
@RabbitListener(queues = "order.release.order.queue")
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单信息:准备关闭订单" + orderEntity.getOrderSn());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue orderDelayQueue(){
/*
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
return queue;
}
/**
* 普通队列
*
* @return
*/
@Bean
public Queue orderReleaseQueue(){
Queue queue = new Queue("order.release.order.queue", true, false, false);
return queue;
}
/**
* TopicExchange
*
* @return
*/
@Bean
public Exchange orderEventExchange(){
/*
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
* */
return new TopicExchange("order-event-exchange", true, false);
}
@Bean
public Binding orderCreateBinding() {
/*
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order", // 路由key一般为事件名
null);
}
@Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
}
然后在控制器创建测试消息:gulimall-order/xxx/order/web/HelloController.java
**
* @author: kaiyi
* @create: 2020-09-12 18:09
*/
@Controller
public class HelloController {
@Autowired
private RabbitTemplate rabbitTemplate;
@ResponseBody
@GetMapping(value = "/test/createOrder")
public String createOrderTest() {
//订单下单成功
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setModifyTime(new Date());
//给MQ发送消息
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
return "ok";
}
}
然后访问该路径 http://order.gulimall.com/test/createOrder, 发送消息,然后去RMQ管理界面可以看到创建的消息已经成功了。
交换机:
交换机绑定的队列(路由key):
队列:
可以看到第一个队列是死信队列,第二个事普通队列
收到的消息为实体对象json:
控制器输出的监控信息:
收到过期的订单信息:准备关闭订单321c3329-d57a-4613-a4ff-331066d4105a
收到过期的订单信息:准备关闭订单44fcf65f-1e7a-40c6-8336-a6c60362920b
相关文章:
【RabbitMQ】一文带你搞定RabbitMQ死信队列
【RabbitMQ】一文带你搞定RabbitMQ延迟队列
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)