天天看點

Kafka學習筆記2: 快速入門

在開始Kafka環境搭建之前,首先要安裝Linux系統,并在Linux系統上安裝JDK1.8版本,關于linux虛拟機的安裝和linux系統下jdk的安裝可以參考我的博文:

http://blog.csdn.net/yulei_qq/article/details/52132536       linux 虛拟機安裝

http://blog.csdn.net/yulei_qq/article/details/51925673       jdk安裝

安裝好虛拟機和jdk之後,開始Kafka環境的搭建學習

一、下載下傳Kafka安裝檔案.

可以從Apache官網下載下傳最新版本的Kafka安裝檔案,我下載下傳的0.10.0.0版本

Kafka學習筆記2: 快速入門

下載下傳完成之後,将壓塑檔案解壓到 /usr/local目錄下

[[email protected] ~]# tar -zxvf kafka_2.11-0.10.0.0.tgz -C /usr/local
           

可以看到/usr/local目錄下有我們解壓後的Kafka目錄檔案

Kafka學習筆記2: 快速入門

二、啟動服務

1、啟動Zookeeeper服務

Kafka使用了Zookeeper,是以首先如果沒有啟動Zookeeper服務的話,要啟動一個Zookeeper服務。可以使用如下的腳本快速的啟動一個單執行個體的Zookeeper服務.

[[email protected] kafka_2.11-0.10.0.0]# bin/zookeeper-server-start.sh config/zookeeper.properties
           
Kafka學習筆記2: 快速入門

2、啟動Kafka服務

啟動完成zookeeper服務之後,就可以啟動Kafka Server了,可以用如下的腳本啟動.

[[email protected] kafka_2.11-0.10.0.0]#  bin/kafka-server-start.sh config/server.properties
           

但是,我啟動失敗了,報如下錯誤:

Kafka學習筆記2: 快速入門

原因是/etc/hosts 檔案沒有指定目前主機名和ip的對關系,設定下就好了:

[[email protected] kafka_2.11-0.10.0.0]# vim /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.142.131  yulei
           

當然,你也可以更改config/server.properties  中關于主機名的配置修改成你目前虛拟機的IP位址,可以通過ifconfig檢視IP位址.

三、建立Topic

我們來建立一個名稱為"test" 的topic ,它隻有一個分區,隻有一個副本. 

[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
           

我們可以運作topic 下的list 指令來檢視建立的topic

[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
test
           

注:二者擇一的,你也可以配置你的broker當不存在topic時自動建立topics 來代替手動建立topic.

四、發送一些消息 

伴随Kafka而來的一個指令行用戶端,可以從一個檔案或标準輸入 擷取輸入資料,然後作為一個messages 發送給Kafka叢集. 預設的每行将作為一條單獨的消息發送. 運作producer ,然後在控制台輸入一些消息發送給 服務端。

[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is anther message
           

五、啟動一個Consumer

Kafka也提供了一個consumer 指令行用戶端,并将消息在在控制台标準的展示出來.

[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is anther message
           

你可以在一個終端運作consumer,另一個終端運作producer,然後你在producer輸入消息,就可以看在你輸入的消息在consumer端展示出來了. 所有的指令行工具都有額外的可選操作。 運作這個指令不帶任務參數,将會展示詳細的使用方法。 例如:

Kafka學習筆記2: 快速入門

六、搭建一個 multi-broker 的叢集

目前為止,我們啟動了單個broker,但是那樣不好玩.  對于Kafka而言,單一的broker 隻是一個叢集中大小的一個, 是以沒有什麼變化除了啟動多個broker執行個體。隻是去感受下,我們來擴充我們的叢集到三個節點 (仍然在我們本地的機器上).

首先為每一個broker制作一個配置檔案.

[[email protected] kafka_2.11-0.10.0.0]# cp config/server.properties config/server-1.properties
[[email protected] kafka_2.11-0.10.0.0]# cp config/server.properties config/server-2.properties
           

現在讓我們剛剛建立的2個檔案配置如下屬性.

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2
           

broker.id  這個屬性在叢集(cluster)中唯一的辨別一個節點(node)。 我們重寫了端口和日志目錄,因為我們在同一台機器上運作這些broker,需要阻止broker注冊同一端口和修改彼此資料。

我們已經啟動了一個Zookeeper和一個單節點broker,是以接下來隻要啟動上面兩個新的節點。

[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-server-start.sh config/server-1.properties &
           
[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-server-start.sh config/server-2.properties &
           

現在建立一個擁有三個副本的、一個分區的Topic.

[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
           

好了,現在我們搭建了一個叢集,但是我們怎麼知道每個節點的資訊呢,運作"describe topics"指令看看:

[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           

這裡解釋一下上面的輸出 :第一行是對所有分區的一個總結,其餘每一行介紹每一個分區的資訊.  因為我們的Topic 隻有一個分區,是以第一行下面隻加了一行。

下面介紹一下上面輸出的各個字段的含義:

  • "leader"  節點負責給定分區 消息的讀和寫,leader 節點是從分區中所有節點中随機選擇的."replicas"
  • "replicas" 列出了該分區複制日志的節點,不管節點是否是Leader節點或是否在服務運作中.
  • "isr" 是 "同步" replicas 的一個集合。是 "replicas" 的一個子集,列出目前運作服務的節點 和以後的Leader節點

在我們的例子當中,節點(node) 1 是  該topic 唯一分區的一個leader。

我們可以運作同樣的指令對于我們最開始建立的topic test:

[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0
           

這裡沒有什麼好驚訝的,原始的topic  "test" 沒有副本 運作在Server 0 ,這個Server 0是我們建立時在叢集中的唯一Server.

讓我們釋出一些消息到我們新的Topic.

[[email protected] kafka_2.11-0.10.0.0]#  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
....
my test message 1   
my test message 2
^C[[email protected] kafka_2.11-0.10.0.0]# 
           

讓我們消費上面釋出的消息:

^C[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
....
my test message 1
my test message 2
           

現在讓我們來測一下容錯能力. Broker 1  扮演者Leader的角色,是以我們把node 1的服務停了,即server-1.properties配置檔案對應的那個服務. 注:官網上的如下指令查詢的時候,指令行沒有任何顯示,查詢不到對應broker 的程序,ps  -ef 可以查詢所有的程序,但是也看不出哪個程序對應哪個服務,是以測試的時候,啟動指令後面不要加& ,這樣關閉服務的時候可以直接在啟動終端ctrl+C來關閉對應的broker服務,暫時還沒有找到好的方法,有知道的朋友可以留言分享下,謝謝.

ps | grep server-1.properties      

然後用--describe指令檢視下,另外一個節點被選做了leader,node 1 不再出現在 in-sync 副本清單中:

Kafka學習筆記2: 快速入門

雖然最初負責寫leader的down掉了,但是之前的消息還是可以消費的.

^C[[email protected] kafka_2.11-0.10.0.0]# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2