谷粒商城-高级-66 -商城业务-消息队列-可靠投递

一、发送端确认

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入确认机制。

  • publisher confirmCallback 确认模式
  • publisher returnCallback 未投递到 queue 退回模式
  • consumer ack机制

file

修改 application.properties 文件:

# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true

创建配置文件:gulimall-order/xxx/order/config/MyRabbitConfig.java
数据转为Json,并且确认机制:

package com.atguigu.gulimall.order.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;

/**
 * @author: kaiyi
 * @create: 2020-09-11 14:21
 */

@Configuration
public class MyRabbitConfig {

  @Autowired
  RabbitTemplate rabbitTemplate;

  @Primary
  @Bean
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    this.rabbitTemplate = rabbitTemplate;
    rabbitTemplate.setMessageConverter(messageConverter());
    initRabbitTemplate();
    return rabbitTemplate;
  }

  /**
   * 使用JSON序列化机制,进行消息转换
   *
   * @return
   */
  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  /**
   * 定制RabbitTemplate
   * 1、服务收到消息就会回调
   * 1、spring.rabbitmq.publisher-confirms: true
   * 2、设置确认回调
   * 2、消息正确抵达队列就会进行回调
   * 1、spring.rabbitmq.publisher-returns: true
   * spring.rabbitmq.template.mandatory: true
   * 2、设置确认回调ReturnCallback
   * <p>
   * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
   */
  // @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
  public void initRabbitTemplate() {

    /**
     * 1、只要消息抵达Broker就ack=true
     * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
     * ack:消息是否成功收到
     * cause:失败的原因
     */
    //设置确认回调
    rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
      System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
    });

     /**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
  }
}

二、消费端确认

可靠抵达-Ack消息确认机制说明:

1、消费者获取到消息,成功处理,可以回复Ack给Broker

  • basic.ack 用于肯定确认;broker将移除此消息
  • basic.nack 用于否定确认;可以指定broker是否丢弃此消息,可以批量
  • basic.reject 用于否定确认;同上,但不能批量

2、默认,消息被消费者收到,就会从broker的queue中移除
3、queue无消费者,消息依然会被存储,直到消费者消费
4、消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式。

  • 消息处理成功,ack(),接受下一个消息,此消息broker就会移除。
  • 消息处理失败,nack()/reject(), 重新发送给其他人处理,或者容错处理后ack。
  • 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人。

配置修改

修改 application.properties 配置文件:

# 开启发送端消息抵达Broker确认
spring.rabbitmq.publisher-confirms=true
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue,就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息,不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

测试

com/atguigu/gulimall/order/service/impl/OrderItemServiceImpl.java

 //@RabbitListener(queues = {"hello-java-queue"})
    @RabbitHandler
    public void revieveMessage(Message message,
        OrderReturnReasonEntity content, Channel channel) throws IOException {

        System.out.println("接收到消息..."+content);

        //拿到主体内容
        byte[] body = message.getBody();

        //拿到的消息头属性信息
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("接受到的消息...内容" + message + "===内容:" + content);

        // Thread.sleep(3000);
        // Channel内按顺序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag===>" + deliveryTag);

        try{
            // 签收货物,非批量模式
            channel.basicAck(deliveryTag, false);
        }catch (Exception e){
            // 网络中断(突然)
        }

    }

相关文章:
Springboot 整合RabbitMq ,用心看完这一篇就够了

为者常成,行者常至