消息队列

RabbitMQ如何保证消息不丢失

生产者确认

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。
消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功
Pasted image 20260327221658

消息失败之后如何处理呢?

  • 记录日志
  • 回调方法即时重发
  • 保存到数据库然后定时重发,成功发送后即刻删除表中的数据
  • 人工审查兜底

内存持久化

MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。

Pasted image 20260327221909

消费者确认

RabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。
而SpringAMQP则允许配置三种确认模式:

  • manual:手动ack,需要在业务代码结束后,消费者调用API发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

ps: 我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,设置重试次数,当次数达到了以后,
如果消息依然失败,将消息投递到异常交换机,交由人工处理.

RabbitMQ重复消费 • 幂等

由于网络抖动or消费者挂, 服务重启导致msg二次消费

解决方案:

  • 每条消息设置一个唯一的标识 ID
  • 幂等方案:【分布式锁、数据库锁(悲观锁、乐观锁)】

死信交换机 & 延时队列

死信交换机详解

  • 延迟队列:进入队列的消息会被延迟消费的队列
    (rej & nack / 超时 / queue满)
  • 场景:超时订单、限时优惠、定时发布

TTL

DelayExchange插件

延迟队列插件实现延迟队列DelayExchange

  • 声明一个交换机,添加delayed属性为true
  • 发送消息时,添加x-delay头,值为超时时间
Pasted image 20260327223201

注解

bean

消息堆积解决

从两端出发

  • 增加消费者
  • 在消费者中开启线程池
  • 扩大队列容积, 提高堆积上限

惰性队列

惰性队列的特征如下:

  • 声明队列的时候设置属性为x-queue-model为lazy
  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

Bean

1
2
3
4
5
6
7
@Bean
public Queue lazyQueue(){
return QueueBuilder
.durable("name")
.lazy() // 开启x-queue-model为lazy
.build();
}

注解

1
2
3
4
5
6
@RabbitListener(queueToDeclare = 
@Queue(
name="",
durable="",
arguments =@Argument(name="", value=""))
)

高可用机制

普通集群

普通集群,or 标准集群(classiccluster),具备下列特征:

  • 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
  • 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
  • 队列所在节点宕机,队列中的消息就会丢失
Pasted image 20260328124046

镜像集群

镜像集群:本质是主从模式,具备下面的特征:

  • 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份
  • 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另一个队列的镜像节点
  • 所有操作都是主节点完成,然后同步给镜像节点
  • 主宕机后,镜像节点会替代成新的主节点

ps: 在主从同步完成前, 主就已经宕机, 可能出现数据丢失

Pasted image 20260328125918

仲裁队列

仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 使用非常简单,没有复杂的配置
  • 主从同步基于Raft协议,强一致 (解决数据丢失)
1
2
3
4
5
6
7
@Bean
public Queue quorumQueue(){
return QueueBuilder
.durable("name") //持久化
.quorum() //仲裁队列
.build();
}