RocketMQ——NameServer消息路由和Broker管理

Nameserver 作用

  RocketMQ架构体系中,是承担注册中心这个角色的,主要有两个作用:

  • 消息路由管理

    Broker在启动后,会定时向Nameserver上报Broker相关的信息,Nameserver在收到Broker上报的信息后,会将这些信息保存在内存中(通过RouteInfoManager管理),供Producer和Consumer使用,达到消息路由的目的

  • Broker管理

    Broker在启动的时候,会向Nameserver集群中的每一个节点注册Broker相关的节点信息,并定期向每个Nameserver节点上报Broker相关的信息,便于后续Producer和Consumer收发消息。同时Nameserver在启动后,会定时的剔除一些下线的Broker

消息路由管理

  本文结合Nameserver和Broker启动过程来介绍Nameserv是怎么实现消息路由,重点查看Nameserver的启动流程,Broker启动流程只查看相关逻辑

NameServer 启动流程

启动类:NamesrvStartup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static NamesrvController main0(String[] args) {
try {
// 创建NamesrvController
NamesrvController controller = createNamesrvController(args);
// NameServer启动
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
createNamesrvController(args);

  如下是createNamesrvController相关的一些逻辑,由于代码篇幅过程,会尽可能精简掉一些无用的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {

// 通过面向对象的思想,将JVM命令相关的一些参数封装成CommandLine对象
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
// 创建Namesrv和NettyServer的配置对象
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
// c 为启动过程中JVM中参数标记 如 -c D:\\dev\\rocketmq\\conf\\myport.txt 可以在myport文件中自定义端口信息
// 将对应配置信息解析到namesrvConfig 和 nettyServerConfig中
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
in.close();
}
}
// p nameserver启动过程中提供的一个打印namesrv和nettyServer对应参数的配置参数,无实际意义
if (commandLine.hasOption('p')) {
// 一些打印操作,打印完后退出程序 省略
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 日志管理
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 创建NamesrvController对象,并返回
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

controller.getConfiguration().registerConfig(properties);

return controller;
}
start(controller);

  createNamesrvController(args) 创建完NamesrvController对象后,通过start方法来完成该controller的初始化工作和启动工作

start 启动源码(核心逻辑)

1
2
3
4
5
6
7
public static NamesrvController start(final NamesrvController controller) throws Exception {
// 初始化工作
boolean initResult = controller.initialize();
// 启动工作
controller.start();
return controller;
}

controller 初始化:controller.initialize()(核心逻辑)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public boolean initialize() {
this.kvConfigManager.load(); // 加载KVConfig,包含一些持久化的顺序消费Topic信息
// 创建远程调用服务 NettyRemotingServer
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 创建远程调用线程池
this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册网络通信核心类 RequestProcessor (后面有介绍)
this.registerProcessor();
// 定时清理不工作的Broker,后面Broker管理会介绍到
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 省略不重要逻辑
return true;
}

controller.initialize() 下的 registerProcessor()

  registerProcessor 会注册一个 RequestProcessor, 该类的 父类为 NettyRequestProcessor,是负责网络通信的核心类

NettyRequestProcessor 定义了两个接口方法:

1
2
3
4
5
6
public interface NettyRequestProcessor {
// 处理请求
RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception;
// 拒绝请求
boolean rejectRequest();
}

DefaultRequestProcessor 中核心方法 processRequest:

RequestProcessor 收到请求后,会根据具体的请求编码请求对应的请求逻辑

本章节主要讨论 RequestCode.REGISTER_BROKER,其他逻辑省略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)  {

switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
// 省略。。。
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
// 不同版本调用的registerBroker方法不一样,以高版本为准(后续有介绍)
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {

return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
// 省略。。。
default:
break;
}
return null;
}

controller.initialize() 下的 registerProcessor() 中 processRequest() 下 registerBrokerWithFilterServer ()

  RequestProcessor 处理RequestCode.REGISTER_BROKER请求的时候,会调用registerBrokerWithFilterServer方法,该方法中拿到请求后,会对请求数据进行对应的解码操作,得到Broker相关的信息后,会维护RouteInfoMananger信息,从而达到消息路由的目的

核心逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) {
// 请求信息校验、解码操作,操作后会拿到一个 requestHeader,这个对象中包含Broker相关的一系列信息,如下:

RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());

return response;
}

/**
该逻辑中,通过namesrvController.getRouteInfoManager(),得到 RouteInfoManager 对象
然后调用 registerBroker ,将 broker 相关的信息维护到 RouteInfoManager 对象中的一系列Map中
*/

RouteInfoManager 对象存储路由信息的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RouteInfoManager {
// Namesrv 是一个读多写少的组件,所以这儿用到的是一把读写锁
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 记录Topic + 队列信息的对应关系
// QueueData 的对应属性为 brokerName readQueueNums writeQueueNums perm topicSynFlag
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 记录Broker + Broker相关的信息
// BrokerData 对应的属性为 cluster brokerName brokerAddrs
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 记录集群 和 集群下Broker 的映射关系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 记录 BrokerAddr 和 BrokerLiveInfo 的映射关系
// BrokerLiveInfo 的属性为 lastUpdateTimestamp dataVersion channel haServerAddr
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 记录brokerAddr 和 Filterserver List的映射关系
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
/*
通过这个映射关系,Producer和Consumer 就可以通过一个Topic 来获取到对用的 QueueData,
然后根据 QueueData中的Broker信息进而获取到完整的Broker相关的一系列信息,
从而可以知道Producer 消息发到哪个Broker,Consumer从哪个Broker中拉取消息,达到消息路由的目的
*/

controller 启动:controller.start()(核心逻辑)

1
2
3
4
5
6
7
8
9
10
public void start() throws Exception {
// 远程调用服务启动
// Netty 启动
this.remotingServer.start();
// 启动 fileWatchService 线程
// 调用 FileWatchService 类中的 run 方法,主要是用来监听文件变化的服务
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}

Broker启动(注册相关逻辑)

启动类:BrokerStartup

入口方法

1
2
3
public static void main(String[] args) {
start(createBrokerController(args));
}

createBrokerController(args)

  该方法可以参考Namesrv的createNamesrvConller的部分逻辑,该方法主要的作用是完成BrokerController的创建工作和BrokerController的初始化工作,本文重点为Namesrv的内容,故该章节只介绍和向Namesrv注册和上报心跳的核心逻辑。

start(createBrokerController(args))

  createBrokerController创建玩 BrokerController 后,通过该方法完成启动操作

controller.start() 源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
*
*/
public void start() throws Exception {
// 省略 ...

if (this.brokerOuterAPI != null) {
// 启动 Broker 对外请求的 服务,Broker 向 Namesrv注册就是通过brokerOuterAPI发起Request请求的
this.brokerOuterAPI.start();
}

if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 注册Broker
this.registerBrokerAll(true, false, true);
}
// 每 30s 发起一次注册 操作
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

// 省略...
}

registerBrokerAll 方法中有一个核心处理方法 doRegisterBrokerAll,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {
// 通过 前面启动的brokerOuterAPI 完成注册操作,具体源码后续会介绍
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());

// 注册完成后有一些更新同步操作,省略
}

registerBrokerAll 核心逻辑

registerBrokerAll 逻辑中可以看到,显示获取到所有的nameserver 地址,Broker在启动后,是便利nameserver所有的地址,向每一个nameserver进行注册上报的,从这儿就可以看出,每一个nameserver中是包含broker全量的信息的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean compressed) {

final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
// 封装 requestHeader和requestBody
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);

RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
// 通过 CountDownLatch 来阻塞,Broker 向所有的Nameserver都完成注册后,才能结束该流程(除非超时)
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
// 便利Nameserver集群中的每一个namesrvAddr
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
// Broker 向单个Nameserver 进行注册操作
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
}
return registerBrokerResultList;
}

registerBroker:向单个Nameserver进行注册

registerBroker逻辑中,会拿着之前封装好的requestHeader,创建一个 RequestCode.REGISTER_BROKER 的 RemotingCommand 请求对象,通过 remotingClient 完成上报请求操作

这样Nameserver在收到请求后,就会在上面介绍到的 RequestProcesser 中的 processRequest 方法中 根据 RequestCode.REGISTER_BROKER 请求编码拿到对应的请求,完成Broker注册和路由管理的动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body ) ,
InterruptedException {
// 创建 RequestCode.REGISTER_BROKER 类型的 RemotingCommand,
// 同 Namesrv 中注册的 RequestProcesser中的 RequestCode.REGISTER_BROKER 找到匹配对应关系,完成闭环
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);

RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
// 省略 response 相关处理
throw new MQBrokerException(response.getCode(), response.getRemark());
}

Broker管理

Nameserver是通过RouteInfoManager来完成路由管理的,brokerLiveTable 是 RouteInfoManager 类下的一个属性,private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
brokerLiveTable 可以获得所有的Broker信息,该Map结构中的BrokerLiveInfo对象中保存了Broker上报信息到Nameserver中的最新的更新时间,Nameserver就是通过这个时间来完成剔除操作的,BrokerLiveInfo的结构如下:

1
2
3
4
5
6
class BrokerLiveInfo {
private long lastUpdateTimestamp;
private DataVersion dataVersion;
private Channel channel;
private String haServerAddr;
}

  Namesrv在启动的时候会启动一个 scheduleAtFixedRate , 该线程会每10秒调用一次scanNotActiveBroker方法,用来清理不工作的Broker(心跳上报不及时的),源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}

scanNotActiveBroker 在执行的时候,会便利所有的brokerLiveTable中的BrokerLiveInfo,然后根据BrokerLiveInfo中lastUpdateTimestamp 属性来判断最新一次更新的时间戳同当前时间戳对比是否在配置的阈值(BROKER_CHANNEL_EXPIRED_TIME=2min)范围内,如果超过这个阈值,则剔除该Broker并Close对应通道数据

总结

NameServer和Broker启动步骤(精简)

  说明:Namesrv在启动的时候,会先创建一个NamesrvController,然后会调用以恶initailize方法,在该方法内会注册一个RequestProcessor(网络通信服务),该方法实现了两个接口,rejectRequest和processRequest,processRequest中在处理请求的时候会判断请求编码,例如请求编码是 REGISTER_BROKER。 而Broker在启动的时候同样会有一系列创建和初始化方法,创建初始化完成后, 会将当前启动的Broker分布注册到每一个Namesrv节点(同时会启动一个定时任务定期向Namesrv定时上报信息),这个注册节点的请求编码就是REGISTER_BROKER,这样Broker注册请求和Namesrv处理请求就能找到对应关系从而完成注册操作。 Namesrv在启动后会启动一个定时线程去清理一些信息上报不及时的Broker(2min),Broker在上报信息到Nameserver的时候,会在RoutInfoManager中维护一个BrokerLiveInfo,这个对象中会记录最新上报信息的时间,Nameserver就是通过这个时间来完成剔除操作的。


RocketMQ——NameServer消息路由和Broker管理
http://yoursite.com/post/c363dfc.html/
Author
Chase Wang
Posted on
August 10, 2022
Licensed under