安裝
-
涉及應用
jdk-8u144-linux-x64.tar.gz
rocketmq-all-4.5.1-bin-release.zip
-
下載下傳位址
JDK
RocketMQ
- 步驟
- 使用應用使用者進行操作,如:rmq使用者,家目錄 /opt/app/rmq
- 解包至家目錄
unzip rocketmq-all-4.5.1-bin-release.zip -d /opt/app/rmq/ && \ mv /opt/app/rmq/rocketmq-all-4.5.1-bin-release/* /opt/app/rmq/ && \ rm -r rocketmq-all-4.5.1-bin-release
- 建立相關檔案夾
mkdir -p mkdir /opt/app/rmq/store/commitlog
- 初始化 mqnamesrv.conf 和 mqbroker.conf
sh bin/mqnamesrv -p > /opt/app/rmq/conf/mqnamesrv.conf && \ sh bin/mqbroker -p > /opt/app/rmq/conf/mqbroker.conf
-
配置 mqnamesrv.conf,mqbroker.conf
mqnamesrv.conf
mqbroker.confrocketmqHome=/opt/app/rmq kvConfigPath=/opt/app/rmq/namesrv/kvConfig.json listenPort=9876 serverWorkerThreads=8 serverCallbackExecutorThreads=0 serverSelectorThreads=3 serverOnewaySemaphoreValue=256 serverAsyncSemaphoreValue=64 serverChannelMaxIdleTimeSeconds=120 serverSocketSndBufSize=2048 serverSocketRcvBufSize=1024 serverPooledByteBufAllocatorEnable=false
rocketmqHome=/opt/app/rmq namesrvAddr=192.168.0.78:9876;192.168.0.79:9876;192.168.0.80:9876;192.168.0.81:9876;192.168.0.82:9876;192.168.0.55:9876;192.168.0.53:9876;192.168.0.51:9876 brokerIP1=192.168.0.78 brokerIP2=192.168.0.78 brokerName=RocketMQ01 brokerClusterName=DefaultCluster brokerId=0 brokerPermission=6 defaultTopicQueueNums=24 autoCreateTopicEnable=true clusterTopicEnable=true brokerTopicEnable=true autoCreateSubscriptionGroup=true sendMessageThreadPoolNums=80 pullMessageThreadPoolNums=48 adminBrokerThreadPoolNums=16 clientManageThreadPoolNums=16 flushConsumerOffsetInterval=5000 flushConsumerOffsetHistoryInterval=60000 rejectTransactionMessage=false fetchNamesrvAddrByAddressServer=false sendThreadPoolQueueCapacity=100000 pullThreadPoolQueueCapacity=100000 filterServerNums=0 longPollingEnable=true shortPollingTimeMills=1000 notifyConsumerIdsChangedEnable=true offsetCheckInSlave=false listenPort=10911 serverWorkerThreads=8 serverCallbackExecutorThreads=0 serverSelectorThreads=3 serverOnewaySemaphoreValue=256 serverAsyncSemaphoreValue=64 serverChannelMaxIdleTimeSeconds=120 serverSocketSndBufSize=131072 serverSocketRcvBufSize=131072 serverPooledByteBufAllocatorEnable=false clientWorkerThreads=4 clientCallbackExecutorThreads=16 clientOnewaySemaphoreValue=2048 clientAsyncSemaphoreValue=65535 connectTimeoutMillis=3000 channelNotActiveInterval=60000 clientChannelMaxIdleTimeSeconds=120 clientSocketSndBufSize=131072 clientSocketRcvBufSize=131072 clientPooledByteBufAllocatorEnable=false storePathRootDir=/opt/app/rmq/store storePathCommitLog=/opt/app/rmq/store/commitlog mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=6000000 flushIntervalCommitLog=1000 flushCommitLogTimed=false flushIntervalConsumeQueue=1000 cleanResourceInterval=10000 deleteCommitLogFilesInterval=100 deleteConsumeQueueFilesInterval=100 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 deleteWhen=04 diskMaxUsedSpaceRatio=75 fileReservedTime=48 putMsgIndexHightWater=600000 maxMessageSize=524288 checkCRCOnRecover=true flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000 maxTransferBytesOnMessageInMemory=262144 maxTransferCountOnMessageInMemory=32 maxTransferBytesOnMessageInDisk=65536 maxTransferCountOnMessageInDisk=8 accessMessageInMemoryMaxRatio=40 messageIndexEnable=true maxHashSlotNum=5000000 maxIndexNum=20000000 maxMsgsNumBatch=64 messageIndexSafe=false haListenPort=10912 haSendHeartbeatInterval=5000 haHousekeepingInterval=20000 haTransferBatchSize=32768 haMasterAddress= haSlaveFallbehindMax=268435456 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH syncFlushTimeout=5000 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h flushDelayOffsetInterval=10000 cleanFileForciblyEnable=true useReentrantLockWhenPutMessage=true registerBrokerTimeoutMills=30000
修改 mqnamesrv.conf 的 rocketmqHome 為 /opt/app/rmq
修改 mqnamesrv.conf 的 listenPort 為 9876
修改 mqbroker.conf 的 namesrvAddr 字段為 Rmq 叢集的位址
修改 mqbroker.conf 的 brokerIP1、brokerIP2 為本機位址
修改 mqbroker.conf 的 brokerName 為本機 hostname
修改 mqbroker.conf 的 storePathRootDir 為 /opt/app/rmq/store
修改 mqbroker.conf 的 storePathCommitLog 為 /opt/app/rmq/store/commitlog
- 啟動
啟動成功應列印如下日志nohup sh /opt/app/rmq/bin/mqnamesrv -c /opt/app/rmq/conf/mqnamesrv.conf &>> /opt/app/rmq/logs/namesrv.log 2>&1 & nohup sh /opt/app/rmq/bin/mqbroker -c /opt/app/rmq/conf/mqbroker.conf &>> /opt/app/rmq/logs/broker.log 2>&1 &
The broker[HOSTNAME, IP:10911] boot success. serializeType=JSON and name server is 叢集伺服器 IP:9876
- 自動挂啟腳本
#!/bin/bash #Check RocketMQ Cluster Status ##########[RocketMQ Cluster]########## port1=`netstat -tnlp | grep -o "9876"` port2=`netstat -tnlp | grep -o "10911"` port3=`netstat -tnlp | grep -o "10912"` if [ "$port1" == '9876' -a "$port2" == '10911' -a "$port3" == '10912' ];then echo "Rmq Cluster is no problem " &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt elif [ "$port1" == '9876' -a -z "$port2" -a -z "$port3" ];then echo "Rmq Cluster Broker is Exception" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt source /etc/profile &>> /dev/null nohup sh /opt/app/rmq/bin/mqbroker -c /opt/app/rmq/conf/mqbroker.conf &>> /opt/app/rmq/logs/broker.log 2>&1 & elif [ -z "$port1" -a "$port2" == '10911' -a "$port3" == '10912' ];then echo "Rmq Cluster NameServer is Exception" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt kill -9 `ps aux | grep "rocketmq.broker" | egrep -v "(grep|runbroker)" | awk '{print $2}'` if [[ $? == 0 ]];then sleep 3 Broker_Port1=`netstat -tnlp | grep -o '10911'` Broker_Port2=`netstat -tnlp | grep -o '10912'` if [ -z $Broker_Port1 -a -z $Broker_Port2 ];then source /etc/profile &>> /dev/null nohup sh /opt/app/rmq/bin/mqnamesrv -c /opt/app/rmq/conf/mqnamesrv.conf &>> /opt/app/rmq/logs/namesrv.log 2>&1 & echo "Rmq Cluster Start NameServer Success" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt if [[ $? == 0 ]];then sleep 3 Server_Port=`netstat -tnlp | grep -o '9876'` if [ "$Server_Port" == '9876' ];then source /etc/profile &>> /dev/null nohup sh /opt/app/rmq/bin/mqbroker -c /opt/app/rmq/conf/mqbroker.conf &>> /opt/app/rmq/logs/broker.log 2>&1 & echo "Rmq Cluster Start Broker Success" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt fi else echo "Rmq Cluster Start NameServer Failed" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt fi fi else echo "Kill Rmq Broker Service Failed" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt fi elif [ -z "$port1" -a -z "$port2" -a -z "$port3" ];then echo "Rmq Cluster NameServer and Broker is Exception" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt source /etc/profile &>> /dev/null nohup sh /opt/app/rmq/bin/mqnamesrv -c /opt/app/rmq/conf/mqnamesrv.conf &>> /opt/app/rmq/logs/namesrv.log 2>&1 & if [[ $? == 0 ]];then sleep 3 Server_Port=`netstat -tnlp | grep -o '9876'` if [ "$Server_Port" == '9876' ];then echo "Rmq Cluster NameServer and Broker is Exception,then start NameServer Success" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt source /etc/profile &>> /dev/null nohup sh /opt/app/rmq/bin/mqbroker -c /opt/app/rmq/conf/mqbroker.conf &>> /opt/app/rmq/logs/broker.log 2>&1 & sleep 3 Broker_Port1=`netstat -tnlp | grep -o '10911'` Broker_Port2=`netstat -tnlp | grep -o '10912'` if [ "$Broker_Port1" == '10911' -a "$Broker_Port2" == '10912' ];then echo "Rmq Cluster NameServer and Broker is Exception,then start Broker Success" &>> /opt/app/rmq/port/rmq/"$date"_rmq_cluster.txt fi fi fi fi find /opt/app/rmq/port/zabbix/ -type f -mtime 3 -exec rm -rf {} \; find /opt/app/rmq/port/rmq/ -type f -mtime 3 -exec rm -rf {} \;
- 停止
sh bin/mqshutdown namesrv && \ sh bin/mqshutdown broker
- 清理
rm -rf /opt/app/rmq/sotre/*
指令
- 檢視本機建立過的 Topic
- 檢視本機所在隊列叢集的資訊
- 檢視隊列所有消費組的消費情況
指令後追加
-g 消費組名
來查詢指定消費組,組名為 ECID_APID
- 檢視指定消費組的消費者數量/連接配接
替換 GroupName,組名為 ECID_APID
sh /opt/app/rmq/bin/mqadmin consumerConnection -n "`ifconfig | grep '255.255.255.0' | awk '{print substr($2,6)}' | grep -v '^172'`:9876" -g GroupName
- 為叢集所有伺服器建立隊列
替換 TOPIC
sh /opt/app/rmq/bin/mqadmin updateTopic -n "`grep namesrvAddr /opt/app/rmq/conf/mqbroker.conf | sed 's/namesrvAddr=//'`" -c DefaultCluster -t "TOPIC" -r 12 -w 12
- 删除指定隊列
替換 TOPIC
參考資料
RocketMQ 指令參考文檔
RocketMQ——角色與術語詳解
RocketMQ 中 Topic、Tag、GroupName 的設計初衷