可靠性
生产者
生产者重连
由于网络波动,可能会出现客户端连接 MQ 失败的情况。
通过配置我们可以开启连接失败后的重连机制:
发信层 template.retry()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| spring: rabbitmq: host: 192.168.1.100 port: 5672 virtual-host: / username: admin password: 123456 # ===== 重连配置 ===== connection-timeout: 1s # 建立连接超时时间 template: retry: enabled: true # 开启 RabbitTemplate 的重试 initial-interval: 1000ms # 首次重试等待 1s multiplier: 2 # 每次重试等待时间 * 2(指数退避) 下次等待=initial*mult max-attempts: 3 # 最多重试 3 次(含首次) max-interval: 10000ms # 重试等待上限 10s
|
SpringAMQP 提供的重试机制是阻塞式的重试 即多次重试等待的过程中, 当前线程是被阻塞的, 会影响业务性能。
如果对于业务性能有要求,建议禁用重试机制。
如果一定要使用,请合理配置等待时长和重试次数,或使用异步线程来执行发送消息的代码。
连接层面的自动恢复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
@Configuration public class RabbitConnectionConfig {
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost("192.168.1.100"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/");
factory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL); factory.setChannelCacheSize(10);
return factory; } }
|
生产者确认
发消息出去了,但消息真的到 Broker 了吗?
到了 Exchange 之后路由到 Queue 了吗?这就是 Publisher Confirms + Returns 要解决的问题。
- 消息投递到了 MQ , 但是路由失败。此时会通过 PublisherReturn 返回路由异常原因, 然后返回 ACK, 告知投递成功
- 临时消息投递到了 MQ , 并且入队成功, 返回 ACK, 告知投递成功
- 持久消息投递到了 MQ , 并且入队完成持久化, 返回 ACK, 告知投递成功
- 其他都是失败, 返回NACK
flowchart LR
P[Publisher<br/>rabbitTemplate]
E1[Topic Exchange]
E2[Exchange2]
Q1[Queue1<br/>non durable]
Q2[Queue2<br/>durable]
C1[Consumer1]
C2[Consumer2]
S[(Persistent Storage)]
P --> E1
P --> E2
E1 --> Q1
E1 --> Q2
Q1 --> C1
Q2 --> C2
Q2 -. persist .-> S
E1 -. ack .-> P
E2 -. ack .-> P
E2 -. return .-> P
三种 ConfirmType 模式需要特别理解:
NONE 是默认值,完全不确认,性能最好,消息丢了也不知道。
CORRELATED 是真正的异步确认,每条消息有唯一 correlationId,Broker 处理后异步回调,性能好且可靠,生产推荐。
SIMPLE 是同步等待确认,发一条等一条,性能差,不推荐。
生产者确认代码实现
配置
1 2 3 4 5 6
| spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
|
配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
@Slf4j @Configuration public class PublisherConfirmConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String msgId = correlationData != null ? correlationData.getId() : "unknown";
if (ack) { log.info("[Confirm ACK] 消息已到达Exchange, msgId={}", msgId); } else { log.error("[Confirm NACK] 消息投递Exchange失败, msgId={}, cause={}", msgId, cause); handleSendFailed(msgId, cause); } }
@Override public void returnedMessage(ReturnedMessage returned) { log.error("[Return] 消息路由失败被退回! exchange={}, routingKey={}, replyCode={}, replyText={}, body={}", returned.getExchange(), returned.getRoutingKey(), returned.getReplyCode(), returned.getReplyText(), new String(returned.getMessage().getBody())); }
private void handleSendFailed(String msgId, String cause) { } }
|