在開始Kafka環境搭建之前,首先要安裝Linux系統,并在Linux系統上安裝JDK1.8版本,關于linux虛拟機的安裝和linux系統下jdk的安裝可以參考我的博文:
http://blog.csdn.net/yulei_qq/article/details/52132536 linux 虛拟機安裝
http://blog.csdn.net/yulei_qq/article/details/51925673 jdk安裝
安裝好虛拟機和jdk之後,開始Kafka環境的搭建學習
一、下載下傳Kafka安裝檔案.
可以從Apache官網下載下傳最新版本的Kafka安裝檔案,我下載下傳的0.10.0.0版本
下載下傳完成之後,将壓塑檔案解壓到 /usr/local目錄下
[[email protected] ~]# tar -zxvf kafka_2.11-0.10.0.0.tgz -C /usr/local
可以看到/usr/local目錄下有我們解壓後的Kafka目錄檔案
二、啟動服務
1、啟動Zookeeeper服務
Kafka使用了Zookeeper,是以首先如果沒有啟動Zookeeper服務的話,要啟動一個Zookeeper服務。可以使用如下的腳本快速的啟動一個單執行個體的Zookeeper服務.
[[email protected] kafka_2.11-0.10.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
2、啟動Kafka服務
啟動完成zookeeper服務之後,就可以啟動Kafka Server了,可以用如下的腳本啟動.
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-server-start.sh config/server.properties
但是,我啟動失敗了,報如下錯誤:
原因是/etc/hosts 檔案沒有指定目前主機名和ip的對關系,設定下就好了:
[[email protected] kafka_2.11-0.10.0.0]# vim /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.142.131 yulei
當然,你也可以更改config/server.properties 中關于主機名的配置修改成你目前虛拟機的IP位址,可以通過ifconfig檢視IP位址.
三、建立Topic
我們來建立一個名稱為"test" 的topic ,它隻有一個分區,隻有一個副本.
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
我們可以運作topic 下的list 指令來檢視建立的topic
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
test
注:二者擇一的,你也可以配置你的broker當不存在topic時自動建立topics 來代替手動建立topic.
四、發送一些消息
伴随Kafka而來的一個指令行用戶端,可以從一個檔案或标準輸入 擷取輸入資料,然後作為一個messages 發送給Kafka叢集. 預設的每行将作為一條單獨的消息發送. 運作producer ,然後在控制台輸入一些消息發送給 服務端。
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is anther message
五、啟動一個Consumer
Kafka也提供了一個consumer 指令行用戶端,并将消息在在控制台标準的展示出來.
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is anther message
你可以在一個終端運作consumer,另一個終端運作producer,然後你在producer輸入消息,就可以看在你輸入的消息在consumer端展示出來了. 所有的指令行工具都有額外的可選操作。 運作這個指令不帶任務參數,将會展示詳細的使用方法。 例如:
六、搭建一個 multi-broker 的叢集
目前為止,我們啟動了單個broker,但是那樣不好玩. 對于Kafka而言,單一的broker 隻是一個叢集中大小的一個, 是以沒有什麼變化除了啟動多個broker執行個體。隻是去感受下,我們來擴充我們的叢集到三個節點 (仍然在我們本地的機器上).
首先為每一個broker制作一個配置檔案.
[[email protected] kafka_2.11-0.10.0.0]# cp config/server.properties config/server-1.properties
[[email protected] kafka_2.11-0.10.0.0]# cp config/server.properties config/server-2.properties
現在讓我們剛剛建立的2個檔案配置如下屬性.
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
broker.id 這個屬性在叢集(cluster)中唯一的辨別一個節點(node)。 我們重寫了端口和日志目錄,因為我們在同一台機器上運作這些broker,需要阻止broker注冊同一端口和修改彼此資料。
我們已經啟動了一個Zookeeper和一個單節點broker,是以接下來隻要啟動上面兩個新的節點。
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-server-start.sh config/server-1.properties &
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-server-start.sh config/server-2.properties &
現在建立一個擁有三個副本的、一個分區的Topic.
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
好了,現在我們搭建了一個叢集,但是我們怎麼知道每個節點的資訊呢,運作"describe topics"指令看看:
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
這裡解釋一下上面的輸出 :第一行是對所有分區的一個總結,其餘每一行介紹每一個分區的資訊. 因為我們的Topic 隻有一個分區,是以第一行下面隻加了一行。
下面介紹一下上面輸出的各個字段的含義:
- "leader" 節點負責給定分區 消息的讀和寫,leader 節點是從分區中所有節點中随機選擇的."replicas"
- "replicas" 列出了該分區複制日志的節點,不管節點是否是Leader節點或是否在服務運作中.
- "isr" 是 "同步" replicas 的一個集合。是 "replicas" 的一個子集,列出目前運作服務的節點 和以後的Leader節點
在我們的例子當中,節點(node) 1 是 該topic 唯一分區的一個leader。
我們可以運作同樣的指令對于我們最開始建立的topic test:
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
這裡沒有什麼好驚訝的,原始的topic "test" 沒有副本 運作在Server 0 ,這個Server 0是我們建立時在叢集中的唯一Server.
讓我們釋出一些消息到我們新的Topic.
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
....
my test message 1
my test message 2
^C[[email protected] kafka_2.11-0.10.0.0]#
讓我們消費上面釋出的消息:
^C[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
....
my test message 1
my test message 2
現在讓我們來測一下容錯能力. Broker 1 扮演者Leader的角色,是以我們把node 1的服務停了,即server-1.properties配置檔案對應的那個服務. 注:官網上的如下指令查詢的時候,指令行沒有任何顯示,查詢不到對應broker 的程序,ps -ef 可以查詢所有的程序,但是也看不出哪個程序對應哪個服務,是以測試的時候,啟動指令後面不要加& ,這樣關閉服務的時候可以直接在啟動終端ctrl+C來關閉對應的broker服務,暫時還沒有找到好的方法,有知道的朋友可以留言分享下,謝謝.
ps | grep server-1.properties
然後用--describe指令檢視下,另外一個節點被選做了leader,node 1 不再出現在 in-sync 副本清單中:
雖然最初負責寫leader的down掉了,但是之前的消息還是可以消費的.
^C[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2