MQ高级

可靠性

Pasted image 20260327095747

生产者

生产者重连

由于网络波动,可能会出现客户端连接 MQ 失败的情况。
通过配置我们可以开启连接失败后的重连机制:

发信层 template.retry()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
spring:
rabbitmq:
host: 192.168.1.100
port: 5672
virtual-host: /
username: admin
password: 123456

# ===== 重连配置 =====
connection-timeout: 1s # 建立连接超时时间
template:
retry:
enabled: true # 开启 RabbitTemplate 的重试
initial-interval: 1000ms # 首次重试等待 1s
multiplier: 2 # 每次重试等待时间 * 2(指数退避) 下次等待=initial*mult
max-attempts: 3 # 最多重试 3 次(含首次)
max-interval: 10000ms # 重试等待上限 10s

SpringAMQP 提供的重试机制是阻塞式的重试 即多次重试等待的过程中, 当前线程是被阻塞的, 会影响业务性能。

如果对于业务性能有要求,建议禁用重试机制。
如果一定要使用,请合理配置等待时长和重试次数,或使用异步线程来执行发送消息的代码。

连接层面的自动恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 连接层面的自动恢复配置
* Spring AMQP 底层 AMQP Client 默认已开启 automatic recovery
* 但生产环境建议显式配置,更可控
*/
@Configuration
public class RabbitConnectionConfig {

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("192.168.1.100");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/");

// 连接缓存模式:CHANNEL(默认)vs CONNECTION
// CHANNEL 模式:一个物理连接,多个 channel 复用
// CONNECTION 模式:每次 getConnection() 可能返回新连接(适合高并发)
factory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
factory.setChannelCacheSize(10); // channel 缓存数量

return factory;
}
}

生产者确认

发消息出去了,但消息真的到 Broker 了吗?
到了 Exchange 之后路由到 Queue 了吗?这就是 Publisher Confirms + Returns 要解决的问题。

  • 消息投递到了 MQ , 但是路由失败。此时会通过 PublisherReturn 返回路由异常原因, 然后返回 ACK, 告知投递成功
  • 临时消息投递到了 MQ , 并且入队成功, 返回 ACK, 告知投递成功
  • 持久消息投递到了 MQ , 并且入队完成持久化, 返回 ACK, 告知投递成功
  • 其他都是失败, 返回NACK
flowchart LR
    P[Publisher<br/>rabbitTemplate]

    E1[Topic Exchange]
    E2[Exchange2]

    Q1[Queue1<br/>non durable]
    Q2[Queue2<br/>durable]

    C1[Consumer1]
    C2[Consumer2]

    S[(Persistent Storage)]

    P --> E1
    P --> E2

    E1 --> Q1
    E1 --> Q2

    Q1 --> C1
    Q2 --> C2

    Q2 -. persist .-> S

    E1 -. ack .-> P
    E2 -. ack .-> P
    E2 -. return .-> P

三种 ConfirmType 模式需要特别理解:
NONE 是默认值,完全不确认,性能最好,消息丢了也不知道。
CORRELATED 是真正的异步确认,每条消息有唯一 correlationId,Broker 处理后异步回调,性能好且可靠,生产推荐。
SIMPLE同步等待确认,发一条等一条,性能差,不推荐。

生产者确认代码实现

配置

1
2
3
4
5
6
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启 CORRELATED 异步确认
publisher-returns: true # 开启 Return 退回回调
template:
mandatory: true # 消息路由失败时退回而非丢弃(触发 Return)

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* Publisher Confirms + Returns 完整实现
*/
@Slf4j
@Configuration
public class PublisherConfirmConfig implements RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnsCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 注意:RabbitTemplate 是 prototype 的,但这里通过 @PostConstruct
* 把 callback 注入进去,每个 RabbitTemplate 实例只设置一次
*/
@PostConstruct
public void init() { //接口的子实现类, 所以init可以直接this
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

/**
* Confirm 回调
* 消息成功到达 Exchange → ack=true
* Broker 内部错误 → ack=false, cause 说明原因
*
* @param correlationData 发送时传入的关联数据(含消息ID)
* @param ack 是否确认成功
* @param cause 失败原因(ack=true 时为 null)
*/

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String msgId = correlationData != null ? correlationData.getId() : "unknown";

if (ack) {
log.info("[Confirm ACK] 消息已到达Exchange, msgId={}", msgId);
// 可选:更新数据库消息状态为"已投递"
} else {
log.error("[Confirm NACK] 消息投递Exchange失败, msgId={}, cause={}", msgId, cause);
// 触发补偿:重发消息 / 告警
// 注意:这里是异步线程,不能抛异常影响主流程
handleSendFailed(msgId, cause);
}
}

/**
* Return 回调(路由失败时触发)
* 消息到了 Exchange 但没有匹配的 Queue binding
*
* @param returned 退回的消息详情(含原始 Message、routingKey、replyText 等)
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("[Return] 消息路由失败被退回! exchange={}, routingKey={}, replyCode={}, replyText={}, body={}",
returned.getExchange(),
returned.getRoutingKey(),
returned.getReplyCode(),
returned.getReplyText(),
new String(returned.getMessage().getBody()));
// 处理:检查 routingKey / binding 配置是否正确
// 生产建议:写入 DB 做人工复查,或者投递到专用的 unroutable 队列
}

private void handleSendFailed(String msgId, String cause) {
// 示例:从 DB 捞出消息体重新发送
// messageService.markFailed(msgId);
// alertService.send("MQ投递失败: " + msgId);
}
}