Nameserver 作用 RocketMQ架构体系中,是承担注册中心这个角色的,主要有两个作用:
消息路由管理
Broker在启动后,会定时向Nameserver上报Broker相关的信息,Nameserver在收到Broker上报的信息后,会将这些信息保存在内存中(通过RouteInfoManager管理),供Producer和Consumer使用,达到消息路由的目的
Broker管理
Broker在启动的时候,会向Nameserver集群中的每一个节点注册Broker相关的节点信息,并定期向每个Nameserver节点上报Broker相关的信息,便于后续Producer和Consumer收发消息。同时Nameserver在启动后,会定时的剔除一些下线的Broker
消息路由管理 本文结合Nameserver和Broker启动过程来介绍Nameserv是怎么实现消息路由,重点查看Nameserver的启动流程,Broker启动流程只查看相关逻辑
NameServer 启动流程 启动类:NamesrvStartup 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static NamesrvController main0 (String[] args) { try { NamesrvController controller = createNamesrvController(args); start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n" , tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1 ); } return null ; }
createNamesrvController(args) ; 如下是createNamesrvController相关的一些逻辑,由于代码篇幅过程,会尽可能精简掉一些无用的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public static NamesrvController createNamesrvController (String[] args) throws IOException, JoranException { Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv" , args, buildCommandlineOptions(options), new PosixParser()); final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); nettyServerConfig.setListenPort(9876 ); if (commandLine.hasOption('c' )) { String file = commandLine.getOptionValue('c' ); if (file != null ) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); in.close(); } } if (commandLine.hasOption('p' )) { } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml" ); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); controller.getConfiguration().registerConfig(properties); return controller; }
start(controller) ; createNamesrvController(args) 创建完NamesrvController对象后,通过start方法来完成该controller的初始化工作和启动工作
start 启动源码(核心逻辑)
1 2 3 4 5 6 7 public static NamesrvController start (final NamesrvController controller) throws Exception { boolean initResult = controller.initialize(); controller.start(); return controller; }
controller 初始化:controller.initialize()(核心逻辑)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public boolean initialize () { this .kvConfigManager.load(); this .remotingServer = new NettyRemotingServer(this .nettyServerConfig, this .brokerHousekeepingService); this .remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_" )); this .registerProcessor(); this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { NamesrvController.this .routeInfoManager.scanNotActiveBroker(); } }, 5 , 10 , TimeUnit.SECONDS); return true ; }
controller.initialize() 下的 registerProcessor()
registerProcessor 会注册一个 RequestProcessor, 该类的 父类为 NettyRequestProcessor,是负责网络通信的核心类
NettyRequestProcessor 定义了两个接口方法:
1 2 3 4 5 6 public interface NettyRequestProcessor { RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) throws Exception ; boolean rejectRequest () ; }
DefaultRequestProcessor 中核心方法 processRequest:
RequestProcessor 收到请求后,会根据具体的请求编码请求对应的请求逻辑
本章节主要讨论 RequestCode.REGISTER_BROKER,其他逻辑省略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public RemotingCommand processRequest (ChannelHandlerContext ctx, RemotingCommand request) { switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG: return this .putKVConfig(ctx, request); case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this .registerBrokerWithFilterServer(ctx, request); } else { return this .registerBroker(ctx, request); } default : break ; } return null ; }
controller.initialize() 下的 registerProcessor() 中 processRequest() 下 registerBrokerWithFilterServer ()
RequestProcessor 处理RequestCode.REGISTER_BROKER请求的时候,会调用registerBrokerWithFilterServer方法,该方法中拿到请求后,会对请求数据进行对应的解码操作,得到Broker相关的信息后,会维护RouteInfoMananger信息,从而达到消息路由的目的
核心逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public RemotingCommand registerBrokerWithFilterServer (ChannelHandlerContext ctx, RemotingCommand request) { RegisterBrokerResult result = this .namesrvController.getRouteInfoManager().registerBroker( requestHeader.getClusterName(), requestHeader.getBrokerAddr(), requestHeader.getBrokerName(), requestHeader.getBrokerId(), requestHeader.getHaServerAddr(), registerBrokerBody.getTopicConfigSerializeWrapper(), registerBrokerBody.getFilterServerList(), ctx.channel()); return response; }
RouteInfoManager 对象存储路由信息的结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class RouteInfoManager { private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final HashMap<String, List<QueueData>> topicQueueTable; private final HashMap<String, BrokerData> brokerAddrTable; private final HashMap<String, Set<String>> clusterAddrTable; private final HashMap<String, BrokerLiveInfo> brokerLiveTable; private final HashMap<String, List<String>> filterServerTable; }
controller 启动:controller.start()(核心逻辑)
1 2 3 4 5 6 7 8 9 10 public void start () throws Exception { this .remotingServer.start(); if (this .fileWatchService != null ) { this .fileWatchService.start(); } }
Broker启动(注册相关逻辑) 启动类:BrokerStartup 入口方法
1 2 3 public static void main (String[] args) { start(createBrokerController(args)); }
createBrokerController(args)
该方法可以参考Namesrv的createNamesrvConller的部分逻辑,该方法主要的作用是完成BrokerController的创建工作和BrokerController的初始化工作,本文重点为Namesrv的内容,故该章节只介绍和向Namesrv注册和上报心跳的核心逻辑。
start(createBrokerController(args))
createBrokerController创建玩 BrokerController 后,通过该方法完成启动操作
controller.start() 源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void start () throws Exception { if (this .brokerOuterAPI != null ) { this .brokerOuterAPI.start(); } if (!messageStoreConfig.isEnableDLegerCommitLog()) { this .registerBrokerAll(true , false , true ); } this .scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run () { try { BrokerController.this .registerBrokerAll(true , false , brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception" , e); } } }, 1000 * 10 , Math.max(10000 , Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000 )), TimeUnit.MILLISECONDS); }
registerBrokerAll 方法中有一个核心处理方法 doRegisterBrokerAll,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void doRegisterBrokerAll (boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) { List<RegisterBrokerResult> registerBrokerResultList = this .brokerOuterAPI.registerBrokerAll( this .brokerConfig.getBrokerClusterName(), this .getBrokerAddr(), this .brokerConfig.getBrokerName(), this .brokerConfig.getBrokerId(), this .getHAServerAddr(), topicConfigWrapper, this .filterServerManager.buildNewFilterServerList(), oneway, this .brokerConfig.getRegisterBrokerTimeoutMills(), this .brokerConfig.isCompressedRegister()); }
registerBrokerAll 核心逻辑
registerBrokerAll 逻辑中可以看到,显示获取到所有的nameserver 地址,Broker在启动后,是便利nameserver所有的地址,向每一个nameserver进行注册上报的,从这儿就可以看出,每一个nameserver中是包含broker全量的信息的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public List<RegisterBrokerResult> registerBrokerAll ( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); List<String> nameServerAddressList = this .remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0 ) { final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte [] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run () { try { RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null ) { registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK" , brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}" , namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
registerBroker:向单个Nameserver进行注册
registerBroker逻辑中,会拿着之前封装好的requestHeader,创建一个 RequestCode.REGISTER_BROKER 的 RemotingCommand 请求对象,通过 remotingClient 完成上报请求操作
这样Nameserver在收到请求后,就会在上面介绍到的 RequestProcesser 中的 processRequest 方法中 根据 RequestCode.REGISTER_BROKER 请求编码拿到对应的请求,完成Broker注册和路由管理的动作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private RegisterBrokerResult registerBroker ( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte [] body ) , InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); RemotingCommand response = this .remotingClient.invokeSync(namesrvAddr, request, timeoutMills); throw new MQBrokerException(response.getCode(), response.getRemark()); }
Broker管理 Nameserver是通过RouteInfoManager来完成路由管理的,brokerLiveTable 是 RouteInfoManager 类下的一个属性,private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; brokerLiveTable 可以获得所有的Broker信息,该Map结构中的BrokerLiveInfo对象中保存了Broker上报信息到Nameserver中的最新的更新时间,Nameserver就是通过这个时间来完成剔除操作的,BrokerLiveInfo的结构如下:
1 2 3 4 5 6 class BrokerLiveInfo { private long lastUpdateTimestamp; private DataVersion dataVersion; private Channel channel; private String haServerAddr; }
Namesrv在启动的时候会启动一个 scheduleAtFixedRate , 该线程会每10秒调用一次scanNotActiveBroker方法,用来清理不工作的Broker(心跳上报不及时的),源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 public void scanNotActiveBroker () { Iterator<Entry<String, BrokerLiveInfo>> it = this .brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms" , next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this .onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
scanNotActiveBroker 在执行的时候,会便利所有的brokerLiveTable中的BrokerLiveInfo,然后根据BrokerLiveInfo中lastUpdateTimestamp 属性来判断最新一次更新的时间戳同当前时间戳对比是否在配置的阈值(BROKER_CHANNEL_EXPIRED_TIME=2min)范围内,如果超过这个阈值,则剔除该Broker并Close对应通道数据
总结 NameServer和Broker启动步骤(精简)
说明:Namesrv在启动的时候,会先创建一个NamesrvController,然后会调用以恶initailize方法,在该方法内会注册一个RequestProcessor(网络通信服务),该方法实现了两个接口,rejectRequest和processRequest,processRequest中在处理请求的时候会判断请求编码,例如请求编码是 REGISTER_BROKER。 而Broker在启动的时候同样会有一系列创建和初始化方法,创建初始化完成后, 会将当前启动的Broker分布注册到每一个Namesrv节点(同时会启动一个定时任务定期向Namesrv定时上报信息),这个注册节点的请求编码就是REGISTER_BROKER,这样Broker注册请求和Namesrv处理请求就能找到对应关系从而完成注册操作。 Namesrv在启动后会启动一个定时线程去清理一些信息上报不及时的Broker(2min),Broker在上报信息到Nameserver的时候,会在RoutInfoManager中维护一个BrokerLiveInfo,这个对象中会记录最新上报信息的时间,Nameserver就是通过这个时间来完成剔除操作的。