天天看點

我個人的kafka_2.12-1.1.0實踐:安裝與測試,單機版(★firecat推薦★)

一、我的伺服器環境:CentOS7,kafka依賴JVM環境,要求安裝有JDK

使用CentOS7系統,先删除自帶的JDK,自帶的不好用(例如jps終端指令不能用等)。下載下傳官網完整的JDK,并設定環境變量JAVA_HOME。詳情見:

https://blog.csdn.net/libaineu2004/article/details/80060812

[root@localhost ~]# java -version

openjdk version "1.8.0_141"

OpenJDK Runtime Environment (build 1.8.0_141-b16)

OpenJDK 64-Bit Server VM (build 25.141-b16, mixed mode)

二、下載下傳

Kafka(

http://kafka.apache.org/ )依賴ZooKeeper http://mirrors.shuosc.org/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz  --- 穩定版 http://mirrors.shuosc.org/apache/kafka/1.1.0/kafka-1.1.0-src.tgz http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz

由于kafka_2.12-1.1.0.tgz是可執行程式,裡面內建了zookeeper簡易版,如果隻是單機測試那麼僅僅下載下傳這個就可以了。但是簡易版功能有限。在生産環境下,建議還是直接下載下傳官方zookeeper軟體。

三、運作與測試,以下都是終端指令

1、cd kafka_2.12-1.1.0

2、啟動Zookeeper,如果無報錯則說明啟動成功。daemon以及nohup &是實作在背景啟動,即守護程序。

方法1(推薦) ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

方法2 nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties&

3、啟動Kafka,如果無報錯則說明啟動成功。daemon以及nohup &是實作在背景啟動,即守護程序。

方法1 (推薦)./bin/kafka-server-start.sh -daemon config/server.properties

方法2 nohup ./bin/kafka-server-start.sh config/server.properties&

4、檢視程序是否正常運作,通過檢測2181與9092端口,2181是zookeeper,9092是kafka

netstat -tunlp|egrep "(2181|9092)"

tcp        0      0 :::2181                     :::*                        LISTEN      19787/java          

tcp        0      0 :::9092                     :::*                        LISTEN      28094/java

我們也可以使用jps終端指令檢視是否啟動:

[root@bogon kafka_2.12-1.1.0]# jps

4384 Kafka

3921 QuorumPeerMain

4412 Jps

5、建立主題,例如test

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

./bin/kafka-topics.sh --create --zookeeper 172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183 --replication-factor 3 --partitions 3 --topic emqtest

--zookeeper : zookeeper叢集清單,用英文逗号分隔。可以不用指定zookeeper整個叢集内的節點清單,隻指定某個或某幾個zookeeper節點清單也是可以的

replication-factor : 複制數目,提供failover機制;1代表隻在一個broker上有資料記錄,一般值都大于1,代表一份資料會自動同步到其他的多個broker,防止某個broker當機後資料丢失。

partitions : 一個topic可以被切分成多個partitions,一個消費者可以消費多個partitions,但一個partitions隻能被一個消費者消費,是以增加partitions可以增加消費者的吞吐量。kafka隻保證一個partitions内的消息是有序的,多個一個partitions之間的資料是無序的。

注意,

(1)使用叢集時,必須手動先建立topic,指定zookeeper節點清單;單機也應該要建立。否則用戶端會報錯。

(2)主題名不建議使用"."和"_"字元。

[root@localhost kafka_2.12-1.0.0copy1]# ./bin/kafka-topics.sh --create --zookeeper 172.16.6.170:2181,172.16.6.170:2182,172.16.6.170:2183 --replication-factor 3 --

partitions 3 --topic emq_broker_message1

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.

Created topic "emq_broker_message1".

partitions 3 --topic emqtest

Created topic "emqtest".

(3)建議建立主題時,指定分區數為kafka broker節點數目的整數倍,例如broker有3台,那麼分區就設定為3個或6個或9個--partitions 3。建立完主題,可以在路徑log.dirs看到主題和分區的檔案夾。詳情參見《Kafka入門與實踐.牟大恩》3.6.1節描述

(4)--replication-factor 3,《kafka權威指南》6.3.1節建議複制系數為3.但是前提是數值不能大于broker的節點數。

如果broker節點數為1,複制系數設定為3,則會報錯:

Error while executing topic command : Replication factor: 3 larger than available brokers: 1.

(5)單機時,zookeeper數目是1;叢集時,zookeeper的數目建議為奇數個,至少3個。

(6)server.properties檔案設定預設值

num.partitions=3

default.replication.factor=1

6、檢視主題

./bin/kafka-topics.sh --describe --zookeeper  localhost:2181 --topic test

./bin/kafka-topics.sh --list --zookeeper localhost:2181

例如:broker叢集,有3個節點。同時給主題mynewt配置設定了3個分區。每個分區都有leader和follow。

[root@bogon kafka_2.12-1.1.0]# ./bin/kafka-topics.sh --describe --zookeeper  localhost:2181 --topic mynewt

Topic:mynewt PartitionCount:3 ReplicationFactor:3 Configs:

Topic: mynewt Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

Topic: mynewt Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

Topic: mynewt Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

7、開啟一個終端,發送消息,生産者的消息要發往kafka

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

8、另起一個終端,消費消息,消費者的消息來自zookeeper(協調轉發)

./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

将來新版本需要寫成./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic test --from-beginning

9、結束程序

./bin/kafka-server-stop.sh

./bin/zookeeper-server-stop.sh

kafka啟動時先啟動zookeeper,再啟動kafka;關閉時相反,先關閉kafka,再關閉zookeeper。

四、總結

1、kafka有什麼?

producer 消息的生成者,即釋出消息

consumer 消息的消費者,即訂閱消息

broker Kafka以叢集的方式運作,可以由一個或多個服務組成,服務即broker

zookeeper 協調轉發

2、配置檔案與消息持久化路徑

/config/server.properties

# A comma seperated list of directories under which to store log files

log.dirs=/tmp/kafka-logs

例如用戶端發送test為主題的消息,會持久化在這個檔案:/tmp/kafka-logs/test-0/00000000000000000000.log

主題清單會記錄在/tmp/zookeeper

/config/zookeeper.properties

# the directory where the snapshot is stored.

dataDir=/tmp/zookeeper

# the port at which the clients will connect

clientPort=2181

建議修改kafka的日志目錄和zookeeper資料目錄,因為這兩項預設放在tmp目錄,而tmp目錄中内容會随重新開機而丢失:

server.properties:

   log.dirs=/tmp/kafka-logs

   修改為

   log.dirs=/usr/local/kafka/logs

zookeeper.properties

   dataDir=/tmp/zookeeper

   dataDir=/usr/local/zookeeper/data

3、配置檔案bootstrap.servers 和 broker.list差別:

本以為是兩個參數,其實是實作一個功能,檢視源代碼後發現broker.list是舊版本指令

4、自己寫的shell啟動和關閉kafka腳本,kafka.sh -- 這個腳本有問題,不穩定

#!/bin/bash
kafka_home=/root/Downloads/kafka_2.12-1.0.0/
case $1 in 
   start)  # 服務啟動需要做的步驟
           echo "zookeeper start"
           $kafka_home/bin/zookeeper-server-start.sh -daemon $kafka_home/config/zookeeper.properties
           #sleep 1
           echo "kafka start"
           $kafka_home/bin/kafka-server-start.sh -daemon $kafka_home/config/server.properties
           #sleep 1
           ;;
   stop)   # 服務停止需要做的步驟
           echo "kafka stop"
           $kafka_home/bin/kafka-server-stop.sh
           #sleep 1
           echo "zookeeper stop"
           $kafka_home/bin/zookeeper-server-stop.sh
           #sleep 1
           ;;
   restart) # 重新開機服務需要做的步驟
            ...
           ;;
   status) # 檢視狀态需要做的步驟
             ...
           ;;
   *) echo "$0 {start|stop|restart|status}"
           exit 4
           ;;
esac      

5、zookeeper叢集

kafka是通過zookeeper來管理叢集。 kafka軟體包内雖然包括了一個簡版的zookeeper,但是感覺功能有限。在生産環境下,建議還是直接下載下傳官方zookeeper軟體。 下載下傳最新版的zookeeper軟體:

http://zookeeper.apache.org/ http://mirror.bit.edu.cn/apache/zookeeper/

zookeeper-3.4.11.tar.gz

歡迎通路姊妹篇:我個人的kafka broker和zookeeper叢集實踐(★firecat推薦★)

---

參考文獻

物聯網架構成長之路(8)-EMQ-Hook了解、連接配接Kafka發送消息

centos7 安裝 kafka_2.11-1.0.0

zookeeper安裝及部署

Kafka安裝及部署

CentOS下ZooKeeper單機模式、叢集模式安裝

Kafka單機、叢集模式安裝詳解(一)

kafka叢集監控之KafkaOffsetMonitor 0.4.1版本 以及用sbt編譯KafkaOffsetMonitor

https://github.com/quantifind/KafkaOffsetMonitor