谷粒商城-高级-78 -商城业务-消息丢失、重复、积压等解决方案
高并发场景的分布式事务,我们采用柔性事务+可靠消息+最终一致性方案(异步确保型),可靠性是最重要的,那么如何保证消息的可靠性呢?
一、消息丢失
1、消息发送出去,由于网络问题没有抵达服务器
- 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有容错机制,可记录到数据库,采用定期扫描重发的方式。
- 做好日志记录,每个消息状态是否都被服务器收到都应该记录
- 做好定期重发,如果消息没有发送成功,定期去数据库扫描未成功的消息进行重发
代码示例:
/**
* 关闭订单
* @param orderEntity
*/
@Override
public void closeOrder(OrderEntity orderEntity) {
//关闭订单之前先查询一下数据库,判断此订单状态是否已支付
OrderEntity orderInfo = this.getOne(new QueryWrapper<OrderEntity>().
eq("order_sn",orderEntity.getOrderSn()));
if (orderInfo.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {
//代付款状态进行关单
OrderEntity orderUpdate = new OrderEntity();
orderUpdate.setId(orderInfo.getId());
orderUpdate.setStatus(OrderStatusEnum.CANCLED.getCode());
this.updateById(orderUpdate);
// 发送消息给MQ
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(orderInfo, orderTo);
try {
//TODO 确保每个消息发送成功,给每个消息做好日志记录,(给数据库保存每一个详细信息)保存每个消息的详细信息
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);
} catch (Exception e) {
//TODO 定期扫描数据库,重新发送失败的消息
// while() 重试次数
}
}
}
创建消息日志记录表:
2、消息抵达Broker,Broker要将消息写入磁盘(持久化)才算成功。此时Broker尚未持久化完成,宕机。
- publisher 也必须加入确认回调机制,确认成功的消息,修改数据库消息状态。
生产者消息确认回调应该增加日志记录,确认回调成功后修改记录日志的状态:gulimall-order/xxx/order/config/MyRabbitConfig.java
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
* <p>
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
/**
* 1、做好消息确认机制(publisher,consumer【手动ack】】)
* 2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍
*/
// 服务器收到生产者发送的消息了
// 修改消息的状态
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 2、只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
3、自动ACK的状态下。消费者收到消息,但没来得及消费然后宕机。
- 一定开启手动ACK,消费成功才移除,失败或者还没来得及处理就 noAck并重新入队。
防止消息丢失记住这两条:
1、做好消息确认机制(publisher,consumer【手动ack】】)
2、每一个发送的消息都在数据库做好记录。定期将失败的消息再发送一遍
二、消息重复
出现重复的几种情况
- 1、消息消费成功,事务已经提交,ack时,机器宕机,导致没有ack成功。
- Broker的消息重新由 unack 变为ready,并发送给其他消费者
- 2、消息消费失败,由于重试机制,自动又将消息发送出去。
- 3、成功消费,ack时宕机,消息又unack变为ready,Broker又重新发送
解决方案
- 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态标识。
- 使用防重表(redis/mysql),发送消息每一个都有业务的唯一标识,处理过就不用再处理。
- rabbitMQ的每一个消息都有 redilivered字段,可以获取是否被重新投递过来的,而不是第一次被投递过来的。
三、消息积压
- 消费者宕机
- 消费者消费能力不足
- 发送者发送流量太大
- 上线更多的消费者,进行正常的消费
- 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)