環境
- CentOS 7.4
- Zookeeper-3.6.1
- Kafka_2.13-2.4.1
- Kafka-manager-2.0.0.2
本次安裝的軟體全部在
/home/javateam
目錄下。
Zookeeper 叢集搭建
- 添加三台機器的
,使用hosts
指令添加以下内容:vim /etc/hosts
192.168.30.78 node-78
192.168.30.79 node-79
192.168.30.80 node-80
- 首先解壓縮:
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
修改檔案夾名稱:
mv apache-zookeeper-3.6.1-bin.tar.gz zookeeper
- 向
配置檔案添加以下内容,并執行/etc/profile
指令使配置生效:source /etc/profile
export ZOOKEEPER_HOME=/home/javateam/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
- 在上面配置檔案中
的目錄下建立一個dataDir
檔案,并寫入一個數值,比如0。myid
檔案裡存放的是伺服器的編号。myid
- 修改zookeeper配置檔案。首先進入
目錄,複制一份$ZOOKEEPER_HOME/conf
并将名稱修改為zoo_sample.cfg
:zoo.cfg
# zookeeper伺服器心跳時間,機關為ms
tickTime=2000
# 投票選舉新leader的初始化時間
initLimit=10
# leader與follower心跳檢測最大容忍時間,響應超過 syncLimit * tickTime,leader認為follower死掉,從伺服器清單删除follower
syncLimit=5
# 資料目錄
dataDir=/home/javateam/zookeeper/data/
# 日志目錄
dataLogDir=/home/javateam/zookeeper/logs/
# 對外服務的端口
clientPort=2181
# 叢集ip配置
server.78=node-78:2888:3888
server.79=node-79:2888:3888
server.80=node-80:2888:3888
注意: 上面配置檔案中的資料目錄和日志目錄需自行去建立對應的檔案夾。這裡server後的數字,與myid檔案中的id是一緻的。
- zookeeper啟動會占用三個端口,分别的作用是:
2181:對cline端提供服務
3888:選舉leader使用
2888:叢集内機器通訊使用(Leader監聽此端口)
記得使用以下指令開啟防火牆端口,并重新開機防火牆:
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --reload
- 然後用
分别啟動三台機器上的zookeeper,啟動後用zkServer.sh start
檢視狀态,如下圖是以有一個leader兩個follower即代表成功:zkServer.sh status
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiETPwJWZ3ZCMwcTP39zZwpmLENTJENTJ3p1dw1GTth2RaZnWE10d0cUT69mMj1mUtRWMwNDZ4ZlMah3aIJ2MJRkY3pFbOdXQ6xEbk12YohnMMVnTtxkbxcVYoVzVhpXNT10MkNDT29GRjBjUIF2Lc12bj5SYphXa5VWen5WY35iclN3Ztl2Lc9CX6MHc0RHaiojIsJye.jpg)
Kafka 叢集搭建
- 首先解壓縮:
tar -zxvf kafka_2.13-2.4.1.tgz
- 改檔案夾名稱:
mv kafka_2.13-2.4.1.tgz kafka
- 向
配置檔案添加以下内容,并執行/etc/profile
指令使配置生效:source /etc/profile
export KAFKA_HOME=/home/javateam/kafka
export PATH=$PATH:$KAFKA_HOME/bin
- JVM級别參數調優,修改
,添加以下内容:kafka/bin/kafka-server-start.sh
# 調整堆大小,預設1G太小了
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
# 選用G1垃圾收集器
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
# 指定JMX暴露端口
export JMX_PORT="8999"
添加後,檔案内容如下圖所示:
- 作業系統級别參數調優,增加檔案描述符的限制,使用
添加以下内容:vim /etc/security/limits.conf
* soft nofile 100000
* hard nofile 100000
* soft nproc 65535
* hard nproc 65535
- 修改kafka的配置檔案
,如下:$KAFKA_HOME/conf/server.properties
############################# Server Basics #############################
# 每一個broker在叢集中的唯一标示,要求是正數。在改變IP位址,不改變broker.id的話不會影響consumers
broker.id=78
############################# Socket Server Settings #############################
# 提供給用戶端響應的位址和端口
listeners=PLAINTEXT://node-78:9092
# broker 處理消息的最大線程數
num.network.threads=3
# broker處理磁盤IO的線程數 ,數值應該大于你的硬碟數
num.io.threads=8
# socket的發送緩沖區大小
socket.send.buffer.bytes=102400
# socket的接收緩沖區,socket的調優參數SO_SNDBUFF
socket.receive.buffer.bytes=102400
# socket請求的最大數值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,會被topic建立時的指定參數覆寫
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka資料的存放位址,多個位址的話用逗号分割
log.dirs=/home/javateam/kafka/logs
# 每個topic的分區個數,若是在topic建立時候沒有指定的話會被topic建立時的指定參數覆寫
num.partitions=3
# 每個分區的副本數
replication.factor=2
# 我們知道segment檔案預設會被保留7天的時間,逾時的話就會被清理,那麼清理這件事情就需要有一些線程來做。這裡就是用來設定恢複和清理data下資料的線程數量
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 控制一條消息資料被儲存多長時間,預設是7天
log.retention.hours=168
# 指定Broker為消息儲存的總磁盤容量大小,-1代表不限制
log.retention.bytes=-1
# Broker能處理的最大消息大小,預設976KB(1000012),此處改為100MB
message.max.bytes=104857600
# 日志檔案中每個segment的大小,預設為1G
log.segment.bytes=1073741824
#上面的參數設定了每一個segment檔案的大小是1G,那麼就需要有一個東西去定期檢查segment檔案有沒有達到1G,多長時間去檢查一次,就需要設定一個周期性檢查檔案大小的時間(機關是毫秒)。
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# 消費者叢集通過連接配接Zookeeper來找到broker。zookeeper連接配接伺服器位址
zookeeper.connect=node-78:2181,node-79:2181,node-80:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
############################# Broker Settings #############################
# 不讓落後太多的副本競選Leader
unclean.leader.election.enable=false
# 關閉kafka定期對一些topic分區進行Leader重選舉
auto.leader.rebalance.enable=false
- 編寫kafka啟動腳本,
内容如下所示:vim startup.sh
# 程序守護模式啟動kafka
kafka-server-start.sh -daemon /home/javateam/kafka/config/server.properties
- 編寫kafka停止腳本,
内容如下所示:vim shutdown.sh
# 停止kafka服務
kafka-server-stop.sh
- 用如下指令,分别啟動kafka服務:
sh /home/javateam/kafka/startup.sh
注意:後面的路徑換成你自己腳本所在的路徑。
- 啟動成功後,連接配接zookeeper檢視節點
資訊:ids
zkCli.sh -server 127.0.0.1:2181
ls /brokers/ids
如下圖所示,代表叢集搭建成功:
Kafka-manager 搭建
- 首先解壓縮:
unzip kafka-manager-2.0.0.2.zip
- 改檔案夾名稱
mv kafka-manager-2.0.0.2.zip kafka-manager
- 修改配置檔案
,把裡面的kafka-manager/conf/application.conf
換成你自己的zookeeper 叢集位址就好了,例如:kafka-manager.zkhosts
kafka-manager.zkhosts="node-78:2181,node-79:2181,node-80:2181"
- 編寫 kafka-manager 啟動腳本,
内容如下:vim startup.sh
nohup /home/javateam/kafka-manager/bin/kafka-manager -Dhttp.port=9000 > /home/javateam/kafka-manager/nohup.out 2>&1 &
- 使用
啟動 kafka-manager,然後通路9000端口,如下圖所示代表成功:sh /home/javateam/kafka-manager/startup.sh
不知道怎麼使用的話就去 google,這裡不再贅述。