分布式事务:消息可靠发送

接上文分布式事务:基于可靠消息服务介绍了整体中间件的设计思路,有些内容没有展开。故此,本文详细讲解下如何将消息可靠发送到Rabbitmq。 🥕

在上文简单提到了如何将消息进行可靠发送,因为shine-mq是无缝集成spring-boot-starter的,所以rabbitmq的操作也是基于spring的rabbitTemplate来完成的。

rabbitTemplate提供了setConfirmCallback方法,可以在消息发送到RabbitMQ交换器后,进行ack的回调。

1
2
3
4
5
6
7
8
9
10
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
...
//消息发送到RabbitMQ交换器后接收ack回调
//如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack返回值为还是true(这里是一个坑,后面仔细讲解)
if (ack) {
log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
...
}
...
});

在此之前还需要设置CachingConnectionFactory

1
2
//设置生成者确认机制
rabbitConnectionFactory.setPublisherConfirms(true);

如果还需要设置setReturnCallback(消息发送到RabbitMQ交换器,但无相应queue时的回调),那就还需要设置rabbitTemplate

1
2
//使用return-callback时必须设置mandatory为true
rabbitTemplate.setMandatory(true);

这里需要知道的是ReturnCallback比ConfirmCallback先回调。

上面提到了setConfirmCallback返回ack的一个坑,就是当消息成功发送到交换器,但是没有匹配的队列,就会触发 ReturnCallback 回调,而且消息也丢失了,最致命的是setConfirmCallback回调返回的ack却是true,如果单靠这个ack来判断消息是否成功到达mq,就有一定概率造成消息丢失。

要解决的话,可以在setReturnCallback做一个缓存,因为上面的情况会先触发 ReturnCallback 回调,我们缓存这个状态,在setConfirmCallback回调的时候,结合ack和之前缓存的状态进行判断是否真的发送成功。

下面是shine-mq实现的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//消息发送到RabbitMQ交换器后接收ack回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause);
String msgId = correlationData.getId();
CorrelationDataExt ext = (CorrelationDataExt) correlationData;
Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator());
coordinator.confirmCallback(correlationData, ack);
// 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack返回值为还是true(这里是一个坑,需要注意)
if (ack && !coordinator.getReturnCallback(msgId)) {
log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
coordinator.delReady(msgId);
} else {
//失败了判断重试次数,重试次数大于0则继续发送
if (ext.getMaxRetries() > 0) {
try {
rabbitmqFactory.setCorrelationData(msgId, ext.getCoordinator(), ext.getMessage(),
ext.getMaxRetries() - 1);
rabbitmqFactory.getTemplate().send(ext.getMessage(), 0, 0, SendTypeEnum.DISTRIBUTED);
} catch (Exception e) {
log.error("Message retry failed to send, message:{} exception: ", ext.getMessage(), e);
}
} else {
log.error("Message delivery failed, msgId: {}, cause: {}", msgId, cause);
}
}
coordinator.delReturnCallback(msgId);
}
});
//使用return-callback时必须设置mandatory为true
rabbitTemplate.setMandatory(true);
//消息发送到RabbitMQ交换器,但无相应queue时的回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
String messageId = message.getMessageProperties().getMessageId();
String coordinatorName = messageId.split(MqConstant.SPLIT)[0];
Coordinator coordinator = (Coordinator) applicationContext.getBean(coordinatorName);
coordinator.setReturnCallback(messageId);
log.error("ReturnCallback exception, no matching queue found. message id: {}, replyCode: {}, replyText: {},"
+ "exchange: {}, routingKey: {}", messageId, replyCode, replyText, exchange, routingKey);
});

熟悉Rabbitmq的同学可能知道,Rabbitmq有两种机制来实现消息的可靠发送。

  • 通过事务机制,这个上篇文章分析过,在这个模式下,rabbitmq的效率很低,不适合。
  • Confirm模式,这个模式下会有三种方式,分别是:
    • 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
    • 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm
    • 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。(rabbitTemplate设置回调就是这个模式)

所以我们知道了rabbitTemplate提供的确认机制是一种异步机制,并不能同步的发现问题,也就是说在极端的网络条件下是会出现消息丢失的。

所以shine-mq通过增加一个Coordinator(协调者)来实现。Coordinator会保存2个状态,一个是prepare(携带回查id),这个状态在前文说过是用来保证上游服务的任务状态的。

而另一个状态ready,就是来保证消息的可靠投递。

首先shine-mq是使用@DistributedTrans来开启。在这个注解的切面里,先持久化ready状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Around(value = "@annotation(trans)")
public void around(ProceedingJoinPoint pjp, DistributedTrans trans) throws Throwable {
...
try {
EventMessage message = new EventMessage(exchange, routeKey, SendTypeEnum.DISTRIBUTED.toString(), checkBackId,
coordinatorName, msgId);
//将消息持久化
coordinator.setReady(msgId, checkBackId.toString(), message);
rabbitmqFactory.setCorrelationData(msgId, coordinatorName, message, null);
rabbitmqFactory.addDLX(exchange, exchange, routeKey, null, null);
if (flag) {
rabbitmqFactory.add(MqConstant.DEAD_LETTER_QUEUE, MqConstant.DEAD_LETTER_EXCHANGE,
MqConstant.DEAD_LETTER_ROUTEKEY, null, null);
flag = false;
}
rabbitmqFactory.getTemplate().send(message, 0, 0, SendTypeEnum.DISTRIBUTED);
} catch (Exception e) {
log.error("Message failed to be sent : ", e);
throw e;
}
}

然后在回调中删除该状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//消息发送到RabbitMQ交换器后接收ack回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData != null) {
log.info("ConfirmCallback ack: {} correlationData: {} cause: {}", ack, correlationData, cause);
String msgId = correlationData.getId();
CorrelationDataExt ext = (CorrelationDataExt) correlationData;
Coordinator coordinator = (Coordinator) applicationContext.getBean(ext.getCoordinator());
//可以自定义实现的回调
coordinator.confirmCallback(correlationData, ack);
// 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack返回值为还是true(这里是一个坑,需要注意)
if (ack && !coordinator.getReturnCallback(msgId)) {
log.info("The message has been successfully delivered to the queue, correlationData:{}", correlationData);
//删除ready状态
coordinator.delStatus(msgId);
} else {
...
}
}
});

因为存储ready是在上游服务任务执行之后的,所以只要有超时的ready记录未被清理掉,daemon(守护线程)只管捞起来进行重发就行,因为Mq的可靠性投递就已经要求下游服务是需要保证幂等性了。

最后还有个极端的情况,就是ready消息存储的时候因为网络抖动该消息丢失了,这时候也没有关系,因为有prepare状态会进行回查,该状态只有在ready存储后才会触发删除。

如果对你有帮助,那就帮忙点个星星把 ^.^

github地址:https://github.com/7le/shine-mq


Github 不要吝啬你的star ^.^
更多精彩 戳我

Follow me on GitHub