RocketMQ——同步消息、异步消息发送源码分析

RocketMQ中,消息发送模式有三种,如下:

1
2
3
4
5
public enum CommunicationMode {
SYNC,
ASYNC,
ONEWAY,
}
  1. SYNC:同步消息
    同步消息会阻塞等待消息发送结果
  2. ASYNC:异步消息
    异步消息发送消息时间,不会等待消息结果,对应的消息回调结果会到相应的Callback回调函数里面返回,不会阻塞用户线程,吞吐量相对较高。
  3. ONEWAY:单向消息
    单向消息,又称不可靠消息,不保证消息的可靠性,只管发,不管发送成功或失败

不同模式消息发送示例

1
2
3
4
5
6
7
8
9
10
11
12
13
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
// 同步消息
producer.send(msg);
// 异步消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {}
@Override
public void onException(Throwable e) {}
});
// 单向消息
producer.sendOneway(msg);

消息发送原理分析

针对三种不同模式的消息类型,在进行消息发送的时候,都会调用到如下方法

1
2
3
4
5
6
7
8
private SendResult sendDefaultImpl(
Message msg, // 发送的消息
final CommunicationMode communicationMode, // 消息模式 同步、异步、单向
final SendCallback sendCallback, // 回调函数
final long timeout ){ // 超时时间
// ......
}

  1. 同步消息调用如下:

    1
    this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout)
  2. 单向消息调用如下:

    1
    this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
  3. 异步消息调用如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public void send(final Message msg, final SendCallback sendCallback, final long timeout)
    throws MQClientException, RemotingException, InterruptedException {
    final long beginStartTime = System.currentTimeMillis();
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
    executor.submit(new Runnable() {
    @Override
    public void run() {
    long costTime = System.currentTimeMillis() - beginStartTime;
    if (timeout > costTime) {
    try {
    sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
    } catch (Exception e) {
    sendCallback.onException(e);
    }
    } else {
    sendCallback.onException(
    new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
    }
    }
    });
    } catch (RejectedExecutionException e) {
    throw new MQClientException("executor rejected ", e);
    }
    }

    异步消息在调用sendDefaultImpl时,是将该调用任务提交给AsyncSenderExecutor线程池完成的,这样在发送消息的过程中可以尽可能的提高消息发送吞吐量,具体的发送逻辑和上面两种逻辑一样,
    需要说明的是老版本中,异步逻辑不是通过将sendDefaultImpl的逻辑交给AsyncSenderExecutor这个线程池中实现的,而是在NettyRemotingAbstract.invokeAsyncImpl消息发送完后数据才返回,后续新增了AsyncSenderExecutor这个逻辑,真正意义上达到了消息的异步处理,提升的异步消息的吞吐量,

sendDefaultImpl的调用如下:

1
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);

sendDefaultImpl分析

sendDefaultImpl 源码如下,省略部分不重要逻辑:

主要做了如下如下几件事情:

  1. 服务状态校验
  2. 消息相关信息校验
  3. 获取当前消息Topic对应的订阅信息,如果获取不到再从Nameserver中获取
  4. 根据配置的重试次数计算出当前消息最多能够发送几次(重试次数+1),默认配置重试次数为2 private int retryTimesWhenSendFailed = 2;
  5. 选择消息具体要发送到哪个MessageQueue
  6. 消息发送 sendKernelImpl
  7. 消息重试
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // 状态校验,一定要保证当前服务状态是运行状态 ServiceState.RUNNING
    this.makeSureStateOK();
    /**
    * 消息校验,包括TOPIC、消息是否为空,消息体是否为空,以及消息大小的校验
    * 消息大小默认最大支持:private int maxMessageSize = 1024 * 1024 * 4; // 4M
    */
    Validators.checkMessage(msg, this.defaultMQProducer);
    final long invokeID = random.nextLong(); // 随机一个 invodeId,打印日志用的
    // 定义用于计算超时时间的时间戳属性
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    // 获取Topic订阅信息,后面会单独介绍到
    // 从当前客户端缓存的路由表中获取对应的订阅信息,如果获取不到,再从都Nameserver获取并更新对应的订阅纤细
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
    // 用于表示是否超时
    boolean callTimeout = false;
    MessageQueue mq = null;
    Exception exception = null;
    SendResult sendResult = null;
    // 根据配置的重试次数,计算出一条消息最多可以发送的次数
    // 默认当消息发送失败的时候重试次数为2,即最多可尝试3次(默认)
    int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    int times = 0;
    String[] brokersSent = new String[timesTotal];
    for (; times < timesTotal; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    // 选择需要发送到哪个MessageQueue,再选择MessageQueue中,会用到很多的负载思想,在后续的文章中会详细说明
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
    mq = mqSelected;
    brokersSent[times] = mq.getBrokerName();
    try {
    beginTimestampPrev = System.currentTimeMillis();
    if (times > 0) {
    //Reset topic with namespace during resend.
    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
    }
    // 调用sendKernelImpl 之前有可能超时 timeout 就是配置的超时时间
    // 前面会有一系列操作,例如获取订阅信息和选择MessageQueue,可能包含一下网络操作,可能会超时,所以这个地方做一次超时时间判断是很有必要的
    long costTime = beginTimestampPrev - beginTimestampFirst;
    if (timeout < costTime) {
    callTimeout = true;
    break;
    }
    // 实际发送消息到内核的调用方法,后续会介绍到
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    endTimestamp = System.currentTimeMillis();
    // 故障延时策略(容错)相关属性更新
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
    switch (communicationMode) {
    case ASYNC:
    return null;
    case ONEWAY:
    return null;
    case SYNC:
    // 如果是同步消息,返回的不是成功标识,并且retryAnotherBrokerWhenNotStoreOK=true (默认是false),则触发重试机制
    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
    continue;
    }
    }
    return sendResult;
    default:
    break;
    }
    } catch (RemotingException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    exception = e;
    continue;
    } catch (MQClientException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    exception = e;
    continue;
    } catch (MQBrokerException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
    log.warn(msg.toString());
    exception = e;
    switch (e.getResponseCode()) {
    case ResponseCode.TOPIC_NOT_EXIST:
    case ResponseCode.SERVICE_NOT_AVAILABLE:
    case ResponseCode.SYSTEM_ERROR:
    case ResponseCode.NO_PERMISSION:
    case ResponseCode.NO_BUYER_ID:
    case ResponseCode.NOT_IN_CURRENT_UNIT:
    continue;
    default:
    if (sendResult != null) {
    return sendResult;
    }
    throw e;
    }
    } catch (InterruptedException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
    throw e;
    }
    } else {
    break;
    }
    }
    // ... 省略异常信息封装逻辑
    throw mqClientException;
    }
    }

this.tryToFindTopicPublishInfo(msg.getTopic()) 方法分析

Producer在进行消息发送的时候,因为要选择对应的Broker以及消息要发送到哪个MessageQueue,所以在发送消息之前要先获取到当前消息Topic对应的订阅信息,
首先会先到 ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable ,获取,如果获取到,则直接返回对应的TopicPublishInfo,如果获取不到,则会调用updateTopicRouteInfoFromNameServer从Nameserver中获取,并更新topicPublishInfoTable,然后返回。

TopicPublishInfo主要包含了如下三个属性和两个标识字段:

  1. private List messageQueueList = new ArrayList(); // 对应的消息队列List
  2. private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 选择MessageQueue时计算index,从而对MessageList.size()取模用来获取对应的MessageQueue的
  3. private TopicRouteData topicRouteData; // 路由信息

tryToFindTopicPublishInfo具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 内存获取TopicPublishInfo
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 获取不到,从Nameserver中更新
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}

updateTopicRouteInfoFromNameServer逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 获取路由信息,最终会调用到 getTopicRouteInfoFromNameServer这个方法,后面有介绍
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
// 获取到当前Topic对应的路由信息同燕来的路由信息比较,判断是否有差异,如果有变化,则更新最新的路由信息到topicRouteTable
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}

// Update Pub info
// 更新路由信息(Producer)
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// 省略 Consumer 逻辑,和上面Producer逻辑一样
// 更新路由信息
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
}
} catch (RemotingException e) {
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
}
return false;
}

getTopicRouteInfoFromNameServer 方法解析

从Nameserver中获取TopciRouteInfo信息
在获取对应路由信息时,会先封装一个RequestCode为GET_ROUTEINTO_BY_TOPIC的Request,然后通过this.remotingClient.invokeSync()向nameserver 发起请求,
Nameserver端会通过DefaultRequestProcessor(网络请求处理核心类)中的processRequest处理对应的ReqeustCode的请求逻辑,在processRequest的请求中会对应有一个case RequestCode.GET_ROUTEINTO_BY_TOPIC 的分支,专门用于处理该类请求的逻辑。
(可参考前面Nameserver相关文章了解整个请求的调用过程和处理流程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
// 封装RequestCode为 GET_ROUTEINTO_BY_TOPIC 的request
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
// 网络请求(向Nameserver请求)
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
// 返回不存在,break,直接抛异常
case ResponseCode.TOPIC_NOT_EXIST: {
break;
}
// 返回成功,后,如果body不为空,则返回相应的 TopicRouteData 信息
case ResponseCode.SUCCESS: {
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
}
default:
break;
}
// break后,直接抛对应responseCode的ClientException
throw new MQClientException(response.getCode(), response.getRemark());
}

sendResult = this.sendKernelImpl 方法分析

源码如下(省略不重要逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo,final long timeout) {
long beginStartTime = System.currentTimeMillis();
// 获取Broker,如果为空,调用tryToFindTopicPublishInfo(上面有介绍)再获取
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}

SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
// ..... 省略Message相关信息设置和一些hook的执行设置
//
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 设置请求头,包含这条消息的所有信息(篇幅过长,此处省略...)
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
// 异步消息相关属性设置以及超时计算和判断,此处省略...
// 消息发送逻辑
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this);
break;
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); // 判断是否超时
// 消息发送逻辑, sendMessage 后面有介绍
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this);
break;
default:
assert false;
break;
}
return sendResult;
} catch (Exception e) {
// 此处省略其他异常的一些处理逻辑...
// 主要是执行一些executeSendMessageHookAfter逻辑
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

this.mQClientFactory.getMQClientAPIImpl().sendMessage 逻辑:

在sendMessage发送逻辑中,针对同步消息和异步消息,在最有分别调用了各自的实现方法,分别是

  1. 同步:
    this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request)
  2. 异步:
    this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer);

同步消息发送处理逻辑:

同步消息在发送会调用到this.invokeSyncImpl(channel, request, timeoutMillis - costTime),
该逻辑中主要是channel.writeAndFlush(request)操作,以及同步阻塞(通过countDownLatch实现),需要注意的是countDownLatch是有超时时间控制的,当达到超时的限制后,如果response还能没有回来,则直接抛异常给Client,Client可能会存在误判的场景。
具体逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis){
// ReqeustId 每个请求都会有一个唯一的请求Id,便于response回来后找到对应的请求
final int opaque = request.getOpaque();
try {
// 创建ResponseFuture对象,包含了请求信息和通道信息
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// channel.writeAndFlush(request) 完成消息发送
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
// 监听消息处理完成的ChannelFutureListener
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// 判断request是否处理成功,成功与否设置对应的发送状态
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
// putResponse里面主要是对waitResponse里面的countDownLatch进行了countDown()操作,
// 因为已经得到了返回结果,应对await操作放行
responseFuture.putResponse(null);
}
});
// 如果是同步消息,则阻塞朱当前线程,因为channel.writeAndFlush是异步操作
// responseFuture.waitResponse方法内部其实是一个 countDownLatch.await(long timeoutMillis) 操作
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
this.responseTable.remove(opaque);
}
}

异步消息发送处理逻辑:

对于异步消息而言,和同步消息类似,唯一不同的地方是,异步消息在发送后,没有responseFuture.waitResponse(timeoutMillis)这一步操作,具体的成功与失败状态的返回是在处理回调逻辑中完成的

异步消息回调函数执行逻辑;

前面有提到,因为异步消息在发送消息时,没有在发送消息时等待对应的发送状态。
异步消息的发送处理结果有对应的处理逻辑,处理逻辑如下:

1
2
3
4
5
6
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
processMessageReceived(ctx, msg);
}
}

NettyClientHandler是在Producer启动的时候,调用 this.mQClientAPIImpl.start() 时,会执行remotingClient.start()方法,该方法在执行的时候,在初始化Channel的时候会向ChannelPipeline注入NettyClientHandler,用来处理REQUEST_COMMAND 或者RESPONSE_COMMAND。
对于客户端请求Broker而言,发送过来的请求,在执行的时候,会执行到 RESPONSE_COMMAND 分支,REQUEST_COMMAND 主要是用来处理服务端向客户端发起的请求,例如 事务回查和消息Rebalence等操作

processResponseCommand逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
final int opaque = cmd.getOpaque();
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseTable.remove(opaque);
if (responseFuture.getInvokeCallback() != null) {
// 执行回调逻辑
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
responseFuture.release();
}
} else {
log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
log.warn(cmd.toString());
}
}

**executeInvokeCallback **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void executeInvokeCallback(final ResponseFuture responseFuture) {
// 表示是否向线程池中提交任务成功,如果提交失败,则通过主线程完成回调任务的执行
boolean runInThisThread = false;
// CallbackExecutor 线程池
ExecutorService executor = this.getCallbackExecutor();
if (executor != null) {
try {
executor.submit(new Runnable() {
@Override
public void run() {
try {
// 执行回调任务
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("execute callback in executor exception, and callback throw", e);
} finally {
responseFuture.release();
}
}
});
} catch (Exception e) {
runInThisThread = true;
log.warn("execute callback in executor exception, maybe executor busy", e);
}
} else {
runInThisThread = true;
}
// 如果 CallbackExecutor 拒绝该任务,则通过主线程处理executeInvokeCallback逻辑
if (runInThisThread) {
try {
responseFuture.executeInvokeCallback();
} catch (Throwable e) {
log.warn("executeInvokeCallback Exception", e);
} finally {
responseFuture.release();
}
}
}

responseFuture.executeInvokeCallback();
在执行该逻辑的时候,会调用到 sendMessageAsync中 remotingClient.invokeAsync方法参数中的 InvokeCallback 这个回调函数中的 operationComplate(ResponseFuture responseFuture) 方法,
执行到该方法中的处理逻辑后,根据消息的处理结果去调用Callback中的onSuccess函数或者onException函数,处理结果类似同步消息处理逻辑,参考同步消息的处理


RocketMQ——同步消息、异步消息发送源码分析
http://yoursite.com/post/efbf9e99.html/
Author
Chase Wang
Posted on
September 17, 2022
Licensed under