天天看點

RocketMQ-4.5.1 部署及挂起腳本

安裝

  • 涉及應用

    jdk-8u144-linux-x64.tar.gz

    rocketmq-all-4.5.1-bin-release.zip

  • 下載下傳位址

    JDK

    RocketMQ

  • 步驟
    1. 使用應用使用者進行操作,如:rmq使用者,家目錄 /opt/app/rmq
    2. 解包至家目錄
    unzip rocketmq-all-4.5.1-bin-release.zip -d /opt/app/rmq/ && \
    mv /opt/app/rmq/rocketmq-all-4.5.1-bin-release/* /opt/app/rmq/ && \
    rm -r rocketmq-all-4.5.1-bin-release
               
    1. 建立相關檔案夾
    mkdir -p mkdir /opt/app/rmq/store/commitlog
               
    1. 初始化 mqnamesrv.conf 和 mqbroker.conf
    sh bin/mqnamesrv -p > /opt/app/rmq/conf/mqnamesrv.conf && \
    sh bin/mqbroker -p > /opt/app/rmq/conf/mqbroker.conf
               
    1. 配置 mqnamesrv.conf,mqbroker.conf

      mqnamesrv.conf

    rocketmqHome=/opt/app/rmq
    kvConfigPath=/opt/app/rmq/namesrv/kvConfig.json
    listenPort=9876
    serverWorkerThreads=8
    serverCallbackExecutorThreads=0
    serverSelectorThreads=3
    serverOnewaySemaphoreValue=256
    serverAsyncSemaphoreValue=64
    serverChannelMaxIdleTimeSeconds=120
    serverSocketSndBufSize=2048
    serverSocketRcvBufSize=1024
    serverPooledByteBufAllocatorEnable=false
    
               
    mqbroker.conf
    rocketmqHome=/opt/app/rmq
    namesrvAddr=192.168.0.78:9876;192.168.0.79:9876;192.168.0.80:9876;192.168.0.81:9876;192.168.0.82:9876;192.168.0.55:9876;192.168.0.53:9876;192.168.0.51:9876
    brokerIP1=192.168.0.78
    brokerIP2=192.168.0.78
    brokerName=RocketMQ01
    brokerClusterName=DefaultCluster
    brokerId=0
    brokerPermission=6
    defaultTopicQueueNums=24
    autoCreateTopicEnable=true
    clusterTopicEnable=true
    brokerTopicEnable=true
    autoCreateSubscriptionGroup=true
    sendMessageThreadPoolNums=80
    pullMessageThreadPoolNums=48
    adminBrokerThreadPoolNums=16
    clientManageThreadPoolNums=16
    flushConsumerOffsetInterval=5000
    flushConsumerOffsetHistoryInterval=60000
    rejectTransactionMessage=false
    fetchNamesrvAddrByAddressServer=false
    sendThreadPoolQueueCapacity=100000
    pullThreadPoolQueueCapacity=100000
    filterServerNums=0
    longPollingEnable=true
    shortPollingTimeMills=1000
    notifyConsumerIdsChangedEnable=true
    offsetCheckInSlave=false
    listenPort=10911
    serverWorkerThreads=8
    serverCallbackExecutorThreads=0
    serverSelectorThreads=3
    serverOnewaySemaphoreValue=256
    serverAsyncSemaphoreValue=64
    serverChannelMaxIdleTimeSeconds=120
    serverSocketSndBufSize=131072
    serverSocketRcvBufSize=131072
    serverPooledByteBufAllocatorEnable=false
    clientWorkerThreads=4
    clientCallbackExecutorThreads=16
    clientOnewaySemaphoreValue=2048
    clientAsyncSemaphoreValue=65535
    connectTimeoutMillis=3000
    channelNotActiveInterval=60000
    clientChannelMaxIdleTimeSeconds=120
    clientSocketSndBufSize=131072
    clientSocketRcvBufSize=131072
    clientPooledByteBufAllocatorEnable=false
    storePathRootDir=/opt/app/rmq/store
    storePathCommitLog=/opt/app/rmq/store/commitlog
    mapedFileSizeCommitLog=1073741824
    mapedFileSizeConsumeQueue=6000000
    flushIntervalCommitLog=1000	
    flushCommitLogTimed=false
    flushIntervalConsumeQueue=1000
    cleanResourceInterval=10000
    deleteCommitLogFilesInterval=100
    deleteConsumeQueueFilesInterval=100
    destroyMapedFileIntervalForcibly=120000
    redeleteHangedFileInterval=120000
    deleteWhen=04
    diskMaxUsedSpaceRatio=75
    fileReservedTime=48
    putMsgIndexHightWater=600000
    maxMessageSize=524288
    checkCRCOnRecover=true
    flushCommitLogLeastPages=4
    flushConsumeQueueLeastPages=2
    flushCommitLogThoroughInterval=10000
    flushConsumeQueueThoroughInterval=60000
    maxTransferBytesOnMessageInMemory=262144
    maxTransferCountOnMessageInMemory=32
    maxTransferBytesOnMessageInDisk=65536
    maxTransferCountOnMessageInDisk=8
    accessMessageInMemoryMaxRatio=40
    messageIndexEnable=true
    maxHashSlotNum=5000000
    maxIndexNum=20000000
    maxMsgsNumBatch=64
    messageIndexSafe=false
    haListenPort=10912
    haSendHeartbeatInterval=5000
    haHousekeepingInterval=20000
    haTransferBatchSize=32768
    haMasterAddress=
    haSlaveFallbehindMax=268435456
    brokerRole=ASYNC_MASTER
    flushDiskType=ASYNC_FLUSH
    syncFlushTimeout=5000
    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    flushDelayOffsetInterval=10000
    cleanFileForciblyEnable=true
    useReentrantLockWhenPutMessage=true
    registerBrokerTimeoutMills=30000
               

    修改 mqnamesrv.conf 的 rocketmqHome 為 /opt/app/rmq

    修改 mqnamesrv.conf 的 listenPort 為 9876

    修改 mqbroker.conf 的 namesrvAddr 字段為 Rmq 叢集的位址

    修改 mqbroker.conf 的 brokerIP1、brokerIP2 為本機位址

    修改 mqbroker.conf 的 brokerName 為本機 hostname

    修改 mqbroker.conf 的 storePathRootDir 為 /opt/app/rmq/store

    修改 mqbroker.conf 的 storePathCommitLog 為 /opt/app/rmq/store/commitlog

    1. 啟動
    必須先啟動 NameServer,再啟動 Broker
    nohup sh /opt/app/rmq/bin/mqnamesrv -c /opt/app/rmq/conf/mqnamesrv.conf &>> /opt/app/rmq/logs/namesrv.log 2>&1 &
    nohup sh /opt/app/rmq/bin/mqbroker -c /opt/app/rmq/conf/mqbroker.conf &>> /opt/app/rmq/logs/broker.log 2>&1 &
               
    啟動成功應列印如下日志
    The broker[HOSTNAME, IP:10911] boot success. serializeType=JSON and name server is 叢集伺服器 IP:9876
    1. 自動挂啟腳本
    #!/bin/bash
    #Check RocketMQ Cluster Status
    ##########[RocketMQ Cluster]##########
    port1=`netstat -tnlp | grep -o "9876"`
    port2=`netstat -tnlp | grep -o "10911"`
    port3=`netstat -tnlp | grep -o "10912"`
    if [ "$port1" == '9876' -a "$port2" == '10911' -a "$port3" == '10912' ];then
    	echo "Rmq Cluster is no problem " &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
    elif [ "$port1" == '9876' -a -z "$port2" -a -z "$port3" ];then
    	echo "Rmq Cluster Broker is Exception" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
    	source /etc/profile &>> /dev/null
    	nohup sh /opt/app/rmq/bin/mqbroker -c /opt/app/rmq/conf/mqbroker.conf &>> /opt/app/rmq/logs/broker.log 2>&1 &
    elif [ -z "$port1" -a "$port2" == '10911' -a "$port3" == '10912' ];then
    	 echo "Rmq Cluster NameServer is Exception" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
    	 kill -9 `ps aux | grep "rocketmq.broker" | egrep -v "(grep|runbroker)" | awk '{print $2}'`
    	 if [[ $? == 0 ]];then
    		sleep 3
    		 Broker_Port1=`netstat -tnlp | grep -o '10911'`
    		 Broker_Port2=`netstat -tnlp | grep -o '10912'`
      			if [ -z $Broker_Port1 -a -z $Broker_Port2 ];then
       				 source /etc/profile &>> /dev/null
        			 nohup sh /opt/app/rmq/bin/mqnamesrv -c /opt/app/rmq/conf/mqnamesrv.conf &>> /opt/app/rmq/logs/namesrv.log 2>&1 &
      				 echo "Rmq Cluster Start NameServer Success" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
        			 if [[ $? == 0 ]];then
          				sleep 3
         				Server_Port=`netstat -tnlp | grep -o '9876'`
          			if [ "$Server_Port" == '9876' ];then
            		source /etc/profile &>> /dev/null
            		nohup sh /opt/app/rmq/bin/mqbroker -c /opt/app/rmq/conf/mqbroker.conf &>> /opt/app/rmq/logs/broker.log 2>&1 &
            		echo "Rmq Cluster Start Broker Success" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
          			fi
        		else
          			echo "Rmq Cluster Start NameServer Failed" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
        		fi
      		fi
    	else
    		echo "Kill Rmq Broker Service Failed" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
    	fi
    elif [ -z "$port1" -a -z "$port2" -a -z "$port3" ];then
    	echo "Rmq Cluster NameServer and Broker is Exception" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
    	source /etc/profile &>> /dev/null
    	nohup sh /opt/app/rmq/bin/mqnamesrv -c /opt/app/rmq/conf/mqnamesrv.conf &>> /opt/app/rmq/logs/namesrv.log 2>&1 &
    	if [[ $? == 0 ]];then
    		sleep 3
    		Server_Port=`netstat -tnlp | grep -o '9876'`
    		if [ "$Server_Port" == '9876' ];then
     			echo "Rmq Cluster NameServer and Broker is Exception,then start NameServer Success" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
     			source /etc/profile &>> /dev/null
     			nohup sh /opt/app/rmq/bin/mqbroker -c /opt/app/rmq/conf/mqbroker.conf &>> /opt/app/rmq/logs/broker.log 2>&1 &
      			sleep 3
      			Broker_Port1=`netstat -tnlp | grep -o '10911'`
      			Broker_Port2=`netstat -tnlp | grep -o '10912'`
      			if [ "$Broker_Port1" == '10911' -a "$Broker_Port2" == '10912' ];then
      			echo "Rmq Cluster NameServer and Broker is Exception,then start Broker Success" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt
      			fi
    		fi
    	fi
    fi
    find /opt/app/rmq/port/zabbix/ -type f -mtime 3 -exec rm -rf {} \;
    find /opt/app/rmq/port/rmq/ -type f -mtime 3 -exec rm -rf {} \;
               
    1. 停止
    重新開機叢集的話,所有伺服器均需執行
    sh bin/mqshutdown namesrv && \
    sh bin/mqshutdown broker
               
    1. 清理
    會将 TOPIC 删除
    rm -rf /opt/app/rmq/sotre/*
               

指令

  • 檢視本機建立過的 Topic
  • 檢視本機所在隊列叢集的資訊
  • 檢視隊列所有消費組的消費情況

指令後追加

-g 消費組名

來查詢指定消費組,組名為 ECID_APID

  • 檢視指定消費組的消費者數量/連接配接

替換 GroupName,組名為 ECID_APID

sh /opt/app/rmq/bin/mqadmin consumerConnection  -n "`ifconfig | grep '255.255.255.0' | awk '{print substr($2,6)}' | grep -v '^172'`:9876" -g GroupName
           
  • 為叢集所有伺服器建立隊列

替換 TOPIC

sh /opt/app/rmq/bin/mqadmin updateTopic -n "`grep namesrvAddr /opt/app/rmq/conf/mqbroker.conf | sed 's/namesrvAddr=//'`" -c DefaultCluster -t "TOPIC" -r 12 -w 12
           
  • 删除指定隊列

替換 TOPIC

參考資料

RocketMQ 指令參考文檔

RocketMQ——角色與術語詳解

RocketMQ 中 Topic、Tag、GroupName 的設計初衷