RocketMQ——同步消息、异步消息发送源码分析
RocketMQ中,消息发送模式有三种,如下:
1 |
|
- SYNC:同步消息
同步消息会阻塞等待消息发送结果 - ASYNC:异步消息
异步消息发送消息时间,不会等待消息结果,对应的消息回调结果会到相应的Callback回调函数里面返回,不会阻塞用户线程,吞吐量相对较高。 - ONEWAY:单向消息
单向消息,又称不可靠消息,不保证消息的可靠性,只管发,不管发送成功或失败
不同模式消息发送示例
1 |
|
消息发送原理分析
针对三种不同模式的消息类型,在进行消息发送的时候,都会调用到如下方法
1 |
|
同步消息调用如下:
1
this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout)
单向消息调用如下:
1
this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
异步消息调用如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
public void run() {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
sendCallback.onException(e);
}
} else {
sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
}异步消息在调用sendDefaultImpl时,是将该调用任务提交给AsyncSenderExecutor线程池完成的,这样在发送消息的过程中可以尽可能的提高消息发送吞吐量,具体的发送逻辑和上面两种逻辑一样,
需要说明的是老版本中,异步逻辑不是通过将sendDefaultImpl的逻辑交给AsyncSenderExecutor这个线程池中实现的,而是在NettyRemotingAbstract.invokeAsyncImpl消息发送完后数据才返回,后续新增了AsyncSenderExecutor这个逻辑,真正意义上达到了消息的异步处理,提升的异步消息的吞吐量,
sendDefaultImpl的调用如下:
1 |
|
sendDefaultImpl分析
sendDefaultImpl 源码如下,省略部分不重要逻辑:
主要做了如下如下几件事情:
- 服务状态校验
- 消息相关信息校验
- 获取当前消息Topic对应的订阅信息,如果获取不到再从Nameserver中获取
- 根据配置的重试次数计算出当前消息最多能够发送几次(重试次数+1),默认配置重试次数为2 private int retryTimesWhenSendFailed = 2;
- 选择消息具体要发送到哪个MessageQueue
- 消息发送 sendKernelImpl
- 消息重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 状态校验,一定要保证当前服务状态是运行状态 ServiceState.RUNNING
this.makeSureStateOK();
/**
* 消息校验,包括TOPIC、消息是否为空,消息体是否为空,以及消息大小的校验
* 消息大小默认最大支持:private int maxMessageSize = 1024 * 1024 * 4; // 4M
*/
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong(); // 随机一个 invodeId,打印日志用的
// 定义用于计算超时时间的时间戳属性
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 获取Topic订阅信息,后面会单独介绍到
// 从当前客户端缓存的路由表中获取对应的订阅信息,如果获取不到,再从都Nameserver获取并更新对应的订阅纤细
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
// 用于表示是否超时
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 根据配置的重试次数,计算出一条消息最多可以发送的次数
// 默认当消息发送失败的时候重试次数为2,即最多可尝试3次(默认)
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 选择需要发送到哪个MessageQueue,再选择MessageQueue中,会用到很多的负载思想,在后续的文章中会详细说明
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
// 调用sendKernelImpl 之前有可能超时 timeout 就是配置的超时时间
// 前面会有一系列操作,例如获取订阅信息和选择MessageQueue,可能包含一下网络操作,可能会超时,所以这个地方做一次超时时间判断是很有必要的
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// 实际发送消息到内核的调用方法,后续会介绍到
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// 故障延时策略(容错)相关属性更新
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
// 如果是同步消息,返回的不是成功标识,并且retryAnotherBrokerWhenNotStoreOK=true (默认是false),则触发重试机制
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
switch (e.getResponseCode()) {
case ResponseCode.TOPIC_NOT_EXIST:
case ResponseCode.SERVICE_NOT_AVAILABLE:
case ResponseCode.SYSTEM_ERROR:
case ResponseCode.NO_PERMISSION:
case ResponseCode.NO_BUYER_ID:
case ResponseCode.NOT_IN_CURRENT_UNIT:
continue;
default:
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
throw e;
}
} else {
break;
}
}
// ... 省略异常信息封装逻辑
throw mqClientException;
}
}
this.tryToFindTopicPublishInfo(msg.getTopic()) 方法分析
Producer在进行消息发送的时候,因为要选择对应的Broker以及消息要发送到哪个MessageQueue,所以在发送消息之前要先获取到当前消息Topic对应的订阅信息,
首先会先到 ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable ,获取,如果获取到,则直接返回对应的TopicPublishInfo,如果获取不到,则会调用updateTopicRouteInfoFromNameServer从Nameserver中获取,并更新topicPublishInfoTable,然后返回。
TopicPublishInfo主要包含了如下三个属性和两个标识字段:
- private List
messageQueueList = new ArrayList (); // 对应的消息队列List - private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 选择MessageQueue时计算index,从而对MessageList.size()取模用来获取对应的MessageQueue的
- private TopicRouteData topicRouteData; // 路由信息
tryToFindTopicPublishInfo具体代码如下:
1 |
|
updateTopicRouteInfoFromNameServer逻辑如下:
1 |
|
getTopicRouteInfoFromNameServer 方法解析
从Nameserver中获取TopciRouteInfo信息
在获取对应路由信息时,会先封装一个RequestCode为GET_ROUTEINTO_BY_TOPIC的Request,然后通过this.remotingClient.invokeSync()向nameserver 发起请求,
Nameserver端会通过DefaultRequestProcessor(网络请求处理核心类)中的processRequest处理对应的ReqeustCode的请求逻辑,在processRequest的请求中会对应有一个case RequestCode.GET_ROUTEINTO_BY_TOPIC 的分支,专门用于处理该类请求的逻辑。
(可参考前面Nameserver相关文章了解整个请求的调用过程和处理流程)
1 |
|
sendResult = this.sendKernelImpl 方法分析
源码如下(省略不重要逻辑):
1 |
|
this.mQClientFactory.getMQClientAPIImpl().sendMessage 逻辑:
在sendMessage发送逻辑中,针对同步消息和异步消息,在最有分别调用了各自的实现方法,分别是
- 同步:
this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request) - 异步:
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);
同步消息发送处理逻辑:
同步消息在发送会调用到this.invokeSyncImpl(channel, request, timeoutMillis - costTime),
该逻辑中主要是channel.writeAndFlush(request)操作,以及同步阻塞(通过countDownLatch实现),需要注意的是countDownLatch是有超时时间控制的,当达到超时的限制后,如果response还能没有回来,则直接抛异常给Client,Client可能会存在误判的场景。
具体逻辑如下:
1 |
|
异步消息发送处理逻辑:
对于异步消息而言,和同步消息类似,唯一不同的地方是,异步消息在发送后,没有responseFuture.waitResponse(timeoutMillis)这一步操作,具体的成功与失败状态的返回是在处理回调逻辑中完成的
异步消息回调函数执行逻辑;
前面有提到,因为异步消息在发送消息时,没有在发送消息时等待对应的发送状态。
异步消息的发送处理结果有对应的处理逻辑,处理逻辑如下:
1 |
|
NettyClientHandler是在Producer启动的时候,调用 this.mQClientAPIImpl.start() 时,会执行remotingClient.start()方法,该方法在执行的时候,在初始化Channel的时候会向ChannelPipeline注入NettyClientHandler,用来处理REQUEST_COMMAND 或者RESPONSE_COMMAND。
对于客户端请求Broker而言,发送过来的请求,在执行的时候,会执行到 RESPONSE_COMMAND 分支,REQUEST_COMMAND 主要是用来处理服务端向客户端发起的请求,例如 事务回查和消息Rebalence等操作
processResponseCommand逻辑如下:
1 |
|
**executeInvokeCallback **
1 |
|
responseFuture.executeInvokeCallback();
在执行该逻辑的时候,会调用到 sendMessageAsync中 remotingClient.invokeAsync方法参数中的 InvokeCallback 这个回调函数中的 operationComplate(ResponseFuture responseFuture) 方法,
执行到该方法中的处理逻辑后,根据消息的处理结果去调用Callback中的onSuccess函数或者onException函数,处理结果类似同步消息处理逻辑,参考同步消息的处理