谷粒商城-高级-66 -商城业务-消息队列-可靠投递
一、发送端确认
保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制。
- publisher confirmCallback 确认模式
- publisher returnCallback 未投递到 queue 退回模式
- consumer ack机制

修改 application.properties 文件:
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
创建配置文件:gulimall-order/xxx/order/config/MyRabbitConfig.java
数据转为Json,并且确认机制:
package com.atguigu.gulimall.order.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
/**
* @author: kaiyi
* @create: 2020-09-11 14:21
*/
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
/**
* 使用JSON序列化机制,进行消息转换
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
* <p>
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {
/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});
/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}
二、消费端确认
可靠抵达-Ack消息确认机制说明:
1、消费者获取到消息,成功处理,可以回复Ack给Broker
- basic.ack 用于肯定确认;broker将移除此消息
- basic.nack 用于否定确认;可以指定broker是否丢弃此消息,可以批量
- basic.reject 用于否定确认;同上,但不能批量
2、默认,消息被消费者收到,就会从broker的queue中移除
3、queue无消费者,消息依然会被存储,直到消费者消费
4、消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式。
- 消息处理成功,ack(),接受下一个消息,此消息broker就会移除。
- 消息处理失败,nack()/reject(), 重新发送给其他人处理,或者容错处理后ack。
- 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人。
配置修改
修改 application.properties 配置文件:
# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
测试
com/atguigu/gulimall/order/service/impl/OrderItemServiceImpl.java
//@RabbitListener(queues = {"hello-java-queue"})
@RabbitHandler
public void revieveMessage(Message message,
OrderReturnReasonEntity content, Channel channel) throws IOException {
System.out.println("接收到消息..."+content);
//拿到主体内容
byte[] body = message.getBody();
//拿到的消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接受到的消息...内容" + message + "===内容:" + content);
// Thread.sleep(3000);
// Channel内按顺序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("deliveryTag===>" + deliveryTag);
try{
// 签收货物,非批量模式
channel.basicAck(deliveryTag, false);
}catch (Exception e){
// 网络中断(突然)
}
}
相关文章:
Springboot 整合RabbitMq ,用心看完这一篇就够了
为者常成,行者常至
自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)