天天看點

【RocketMQ源碼分析】Namesrv

一、下載下傳源碼

從github拉取rocketmq代碼:https://github.com/apache/rocketmq.git,目前最新的版本是4.5.2.我用的是ideal,可以看到rocketmq源碼結構如下:

【RocketMQ源碼分析】Namesrv

rocketmq共包含9個子產品,解釋一下各個子產品的功能和作用:

  • rocketmq-common:通用的枚舉、基類方法、或者資料結構,包名有admin、consumer、filter、hook、message
  • rocketmq-remoting:使用netty的用戶端、服務端,使用fastjson序列化,自定義二進制協定
  • rocketmq-srvutil:隻有一個ServerUtil類,隻提供Server程式依賴,盡可能減少用戶端依賴
  • rocketmq-store:消息存儲,索引,consumerLog,commitLog等
  • rocketmq-client:消息發送和接收,包含consumer和producer
  • rocketmq-filtersrv:消息過濾器
  • rocketmq-broker:服務端,接受消息,存儲消息,consumer拉取消息
  • rocketmq-tools:指令行工具
  • rocketmq-namesrv:NameServer,類似服務注冊中心,broker在這裡注冊,consumer和producer在這裡找到broker位址

可以看到rocketmq網絡通訊是建立在netty上的,netty是一款高性能的異步非阻塞的網絡通訊架構,很多開源的中間件(dubbo/rocketmq)網絡通訊層都是基于netty開發的.

二、NameSrv源碼詳解

首先,看一下rocketmq-namesrv子產品,代碼量不多,一共8個類,從面子上可以看出啟動入口:NamesrvStartup

核心功能

NameServer相當于配置中心,維護Broker叢集、Broker資訊、Broker存活資訊、主題與隊列資訊等。Broker向它注冊路由資訊,同時Client向其擷取路由資訊。NameServer本身是沒有狀态的,NameServer彼此之間不通信,每個Broker與叢集内所有的Nameserver保持長連接配接。如果使用過Zookeeper,就比較容易了解了,但是功能比Zookeeper弱。

NameServer定時任務每10s掃描一下失效的Broker,超過2分鐘沒有收到Broker的心跳包,則關閉連接配接 。

  • NamesrvStartup:NamesrvStartup是一個啟動類,主要邏輯都在處理指令行和配置
  • NamesrvController:NameserController 是 NameServer 子產品的核心控制類
  • NamesrvConfig:主要指定 nameserver 的相關配置屬性:

    kvConfigPath(kvConfig.json)。

    mqhome/namesrv/namesrv.properties。

    orderMessageEnable,是否開啟順序消息功能,預設為false。

  • KVConfigManager:讀取或變更NameServer的配置屬性,加載 NamesrvConfig 中配置的配置檔案到記憶體,此類一個亮點就是使用輕量級的非線程安全容器,再結合讀寫鎖對資源讀寫進行保護。盡最大程度提高線程的并發度。
  • RouteInfoManager :NameServer 資料的載體,記錄 Broker、Topic 等資訊。
  • BrokerHousekeepingService:實作 ChannelEventListener接口,可以說是通道在發送異常時的回調方法
NamesrvStartup啟動器

NamesrvStartup是一個啟動類,主要邏輯都在處理指令行和配置,主要功能都是在NamesrvController中;

public class NamesrvStartup {

    private static InternalLogger log;
    private static Properties properties = null;
    private static CommandLine commandLine = null;

    public static void main(String[] args) {
        main0(args);
    }

    public static NamesrvController main0(String[] args) {

        try {
            //初始化NamesrvController
            NamesrvController controller = createNamesrvController(args);
            // 啟動NamesrvController
            start(controller);
            String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
            log.info(tip);
            System.out.printf("%s%n", tip);
            return controller;
        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }

        return null;
    }

           

createNamesrvController:建立NamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
        //PackageConflictDetect.detectFastjson();

        // 根據指令行參數,使用commons-cli指令行工具包解析生成CommandLine對象
        // 在parseCmdLine中,如果指令行中有-h選項,執行列印幫助文檔的邏輯,然後退出,不再繼續
        Options options = ServerUtil.buildCommandlineOptions(new Options());
        commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
        if (null == commandLine) {
            System.exit(-1);
            return null;
        }

        // 初始化了2個配置 NamesrvConfig,NettyServerConfig,其中NettyServerConfig監聽9876是寫死的
        // 然後通過指令行參數 -c 指定一個配置檔案,然後将配置檔案中的内容解析成NamesrvConfig,NettyServerConfig的配置
        // 設定NamesrvConfig,NettyServerConfig的邏輯是看類中的set方法,如果set方法後的名字和配置檔案中的key比對,就會設定對應的值
        final NamesrvConfig namesrvConfig = new NamesrvConfig();
        final NettyServerConfig nettyServerConfig = new NettyServerConfig();
        nettyServerConfig.setListenPort(9876);
        if (commandLine.hasOption('c')) {
            String file = commandLine.getOptionValue('c');
            if (file != null) {
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                MixAll.properties2Object(properties, namesrvConfig);
                MixAll.properties2Object(properties, nettyServerConfig);

                namesrvConfig.setConfigStorePath(file);

                System.out.printf("load config properties file OK, %s%n", file);
                in.close();
            }
        }

        //如果指定了 -p 選項,會在控制台列印配置資訊,然後退出,不再繼續執行
        if (commandLine.hasOption('p')) {
            InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
            MixAll.printObjectProperties(console, namesrvConfig);
            MixAll.printObjectProperties(console, nettyServerConfig);
            System.exit(0);
        }

        //将啟動指令行的參數配置設定到NamesrvConfig中
        MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

        // 檢查必須設定RocketMQHome
        // 在NamesrvConfig中,可以看到使用系統屬性rocketmq.home.dir,環境變量ROCKETMQ_HOME和前面的-c指定的配置檔案設定RocketMQHome
        // 在mqnamesrv啟動腳本中會自定探測RockerMQ并export ROCKETMQ_HOME
        if (null == namesrvConfig.getRocketmqHome()) {
            System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
            System.exit(-2);
        }

        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
        JoranConfigurator configurator = new JoranConfigurator();
        configurator.setContext(lc);
        lc.reset();
        configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");

        log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

        // 使用logback列印NamesrvConfig,NettyServerConfig配置資訊
        MixAll.printObjectProperties(log, namesrvConfig);
        MixAll.printObjectProperties(log, nettyServerConfig);

        final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

        // 最後還把-c指定的檔案的配置在儲存到Configruation中
        // remember all configs to prevent discard
        controller.getConfiguration().registerConfig(properties);

        return controller;
    }
           

NamesrvController初始化完成後,就調用start(controller),才真正的開始:

// 啟動NamesrvController
start(controller);
           

啟動NamesrvController,start(controller)方法中有兩個核心的方法:

public static NamesrvController start(final NamesrvController controller) throws Exception {

        if (null == controller) {
            throw new IllegalArgumentException("NamesrvController is null");
        }

        //NamesrvController初始化
        boolean initResult = controller.initialize();
      
        //啟動NamesrvController
        controller.start();

        return controller;
    }
           

NamesrvController初始化:

public boolean initialize() {

        // 從NamesrvConfig#KvConfigPath指定的檔案中反序列化資料到KVConfigManager#configTable中
        this.kvConfigManager.load();

        // 啟動網絡通信的Netty服務
        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        // 初始化一下負責處理Netty網絡互動資料的線程池,
        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        // 注冊一個預設負責處理Netty網絡互動資料的DefaultRequestProcessor,這個Processor會使用remotingExecutor執行
        // *劃重點,後面這裡會再次提到*
        this.registerProcessor();

        // 每10s掃描一下失效的Broker
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        // 每10min列印一下前面被反複蹂躏的配置
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }
           

啟動NamesrvController:

public void start() throws Exception {
        //啟動netty
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }
           

看看NettyRemotingServer的start()方法的具體實作:

public void start() {
        // 初始化一個線程池,用于執行共享的Handler
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyServerConfig.getServerWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                }
            });

        prepareSharableHandlers();

        ServerBootstrap childHandler =
            this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_KEEPALIVE, false)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
                .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                            .addLast(defaultEventExecutorGroup,
                                encoder,
                                new NettyDecoder(),
                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                                connectionManageHandler,
                                serverHandler
                            );
                    }
                });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }
// 這裡有一個channelEventListener,在NamesrvController中channelEventListener就是BrokerHousekeepingService,BrokerHousekeepingService負責在broker斷開連接配接的時候,移除RouteInfoManager中的路由資訊
        // NettyEventExecutor會維護一個NettyEvent的隊列,NettyConnectManageHandler會向NettyEvent的隊列中添加Event,然後由channelEventListener進行消費
        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
// 定時掃描responseTable,執行逾時請求的callback
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
           

整個過程大概如下:

【RocketMQ源碼分析】Namesrv

繼續閱讀