天天看點

KAFKA叢集搭建

一、簡介  

Kafka是Apache下的一個子項目,是一個高性能跨語言分布式釋出/訂閱消息隊列系統,使用Scala編寫,它以可水準擴充和高吞吐率而被廣泛使用。目前越來越多的開源分布式處理系統如Cloudera、Apache Storm、Spark都支援與Kafka內建。

搭建實時資料流管道,在系統或應用之間可靠的擷取資料

搭建對資料流進行轉換或相應的實時流應用程式、

  為了了解Kafka具體如何實作這些功能, 首先了解幾個概念:

Kafka是作為叢集,運作在一台或多台伺服器上的.

Kafka叢集用主題(topics)來分類别儲存資料流(records).

每個記錄(record)由一個鍵(key),一個值(value)和一個時間戳(timestamp)組成

  Kafka有4個核心APIs:

Producer API負責生産資料流,允許應用程式将記錄流釋出到一個或多個Kafka主題(topics).

Consumer API負責使用資料流,允許應用程式訂閱一個或多個主題并處理為其生成的資料流.

Streams API負責處理或轉化資料流,允許應用程式充當資料流處理器的角色, 處理來自一個或多個主題的輸入資料流,并産生輸出資料流到一個或多個輸出主題,一次來有效地将輸入流轉換成輸出流.

Connector API負責将資料流與其他應用或系統結合,允許搭建建和運作可重複使用的生産者或消費者,将Kafka資料主題與現有應用程式或資料系統相連接配接的。 例如,關系資料庫的連接配接器可能會将表的每個更改的事件,都捕獲為一個資料流.

二、環境準備

  kafka叢集的搭建是建立在jdk和zookeeper叢集環境之上的;文中環境在Ubuntu1404系統上搭建;

1)安裝JAVA

<code># add-apt-repository ppa:webupd8team/java </code>

<code># apt-get update</code>

<code># apt-get install oracle-java8-installer </code>

<code># java -version //檢驗Java版本</code>

<code>java version </code><code>"1.8.0_111"</code>

<code>Java(TM) SE Runtime Environment (build 1.8.0_111-b14)</code>

<code>Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)</code>

2)安裝ZOOKEEPER叢集

3)機器清單,/etc/hosts檔案内容如下:

<code>172.30.100.1 kafka-001</code>

<code>172.30.100.2 kafka-002</code>

<code>172.30.100.3 kafka-003</code>

<code>172.30.100.4 kafka-004</code>

<code>172.30.100.5 kafka-005</code>

<code>172.30.100.21 zookeeper-001</code>

<code>172.30.100.22 zookeeper-002</code>

<code>172.30.100.23 zookeeper-003</code>

<code>172.30.100.24 zookeeper-004</code>

<code>172.30.100.25 zookeeper-005</code>

三、Kafka叢集搭建

1)下載下傳相應版本kafka軟體包:

  官方0.10.2.0版本下載下傳連結如下:

<a href="http://mirrors.hust.edu.cn/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz" target="_blank">http://mirrors.hust.edu.cn/apache/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz</a>

這裡需要注意kafka的版本,下載下傳之前一定要確定你的client端是否支援目前kafka server的版本,否則就得重新安裝。

這裡給出官方提供的其他幾個下載下傳連結,包括其他版本的都在下面的位址裡能夠找到。

<a href="https://www.apache.org/dyn/closer.cgi#verify" target="_blank">https://www.apache.org/dyn/closer.cgi#verify</a>

2)配置kafka叢集

  下載下傳完之後解壓,并配置:

<code>&gt; </code><code>tar</code> <code>-xzf kafka_2.11-0.10.2.0.tgz</code>

<code>&gt; </code><code>cd</code> <code>kafka_2.11-0.10.2.0</code>

<code>&gt; vim config</code><code>/server</code><code>.properties</code>

kafka的配置檔案示例:

<code>#唯一辨別在叢集中的ID,要求是正數。</code>

<code>broker.</code><code>id</code><code>=1</code>

<code>delete.topic.</code><code>enable</code><code>=</code><code>true</code>

<code># 監聽位址和端口号</code>

<code>listeners=PLAINTEXT:</code><code>//172</code><code>.30.100.1:9092</code>

<code># 處理網絡請求的最大線程數</code>

<code>num.network.threads=9</code>

<code># 處理磁盤I/O的線程數</code>

<code>num.io.threads=16</code>

<code># # leader中進行複制的線程數,增大這個數值會增加relipca的IO</code>

<code>num.replica.fetchers=3</code>

<code>#配置log的檔案目錄,前提確定目錄存在</code>

<code>log.</code><code>dirs</code><code>=</code><code>/data/kafka-logs</code>

<code># 每個topic的分區個數,更多的partition會産生更多的segment file</code>

<code>num.partitions=2</code>

<code># 配置zookeeper服務的位址</code>

<code>zookeeper.connect=zookeeper-001:2181,zookeeper-002:2181,zookeeper-003:2181,zookeeper-004:2181,zookeeper-005:2181</code>

這裡注意的地方:listeners一定要配置成為IP位址;如果配置為localhost或伺服器的hostname,在使用java發送資料時就會抛出異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因為在沒有配置advertised.host.name 的情況下,Kafka并沒有像官方文檔宣稱的那樣改為廣播我們配置的host.name,而是廣播了主機配置的hostname。遠端的用戶端并沒有配置 hosts,是以自然是連接配接不上kafka。

  複制server.properties到其他kafka節點的機器上,并修改broker.id,listeners。

3)服務的啟動和停止

  啟動服務:

<code>bin</code><code>/kafka-server-start</code><code>.sh -daemon config</code><code>/server</code><code>.properties</code>

注意一定要加-daemon選項,不然終端退出,服務會随之退出。  

  停止服務:

<code> </code><code>bin</code><code>/kafka-server-stop</code><code>.sh</code>

四、Kafka叢集測試

1)建立topic:

<code>&gt; bin</code><code>/kafka-topics</code><code>.sh --create --zookeeper zookeeper-001:2181 --replication-factor 3 --partitions 2 --topic </code><code>test</code>

這裡的--zookeeper可以随機指定一個zookeeper的位址。

檢測topic是否建立成功:

<code>&gt; bin</code><code>/kafka-topics</code><code>.sh --list --zookeeper zookeeper-001:2181</code>

<code>test</code>

檢視topic描述資訊:

<code> </code><code>bin</code><code>/kafka-topics</code><code>.sh --describe --zookeeper zookeeper-001:2181 --topic </code><code>test</code>

<code>Topic:CloudMonitor  PartitionCount:5    ReplicationFactor:3 Configs:</code>

<code>    </code><code>Topic: </code><code>test</code>    <code>Partition: 0   Leader: 2  Replicas: 2,3,4    Isr: 2,3,4</code>

<code>    </code><code>Topic: </code><code>test</code>    <code>Partition: 1   Leader: 3  Replicas: 3,4,5    Isr: 3,4,5</code>

<code>    </code><code>Topic: </code><code>test</code>    <code>Partition: 2   Leader: 4  Replicas: 4,1,2    Isr: 4,1,2</code>

<code>    </code><code>Topic: </code><code>test</code>    <code>Partition: 3   Leader: 5  Replicas: 5,2,3    Isr: 5,3,2</code>

<code>    </code><code>Topic: </code><code>test</code>    <code>Partition: 4   Leader: 1  Replicas: 1,3,4    Isr: 1,3,4</code>

2)生産消息:

<code>&gt; bin</code><code>/kafka-console-producer</code><code>.sh --broker-list localhost:9092 --topic </code><code>test</code>

<code>This is a message</code>

<code>This is another message</code>

3)消費消息:

<code>&gt; bin</code><code>/kafka-console-consumer</code><code>.sh --bootstrap-server localhost:9092 --topic </code><code>test</code> <code>--from-beginning</code>

會看到剛才生産的兩條messsage。

結尾:

  結尾附上幾個常用Message Queue的對比,摘自網絡:

RabbitMQ

  RabbitMQ是使用Erlang編寫的一個開源的消息隊列,本身支援很多的協定:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更适合于企業級的開發。同時實作了Broker構架,這意味着消息在發送給用戶端時先在中心隊列排隊。對路由,負載均衡或者資料持久化都有很好的支援。

Redis

  Redis是一個基于Key-Value對的NoSQL資料庫,開發維護很活躍。雖然它是一個Key-Value資料庫存儲系統,但它本身支援MQ功能,是以完全可以當做一個輕量級的隊列服務來使用。對于RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試資料分為128Bytes、512Bytes、1K和10K四個不同大小的資料。實驗表明:入隊時,當資料比較小時Redis的性能要高于RabbitMQ,而如果資料大小超過了10K,Redis則慢的無法忍受;出隊時,無論資料大小,Redis都表現出非常好的性能,而RabbitMQ的出隊性能則遠低于Redis。

ZeroMQ

  ZeroMQ号稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。ZeroMQ能夠實作RabbitMQ不擅長的進階/複雜的隊列,但是開發人員需要自己組合多種技術架構,技術上的複雜度是對這MQ能夠

應用成功的挑戰。ZeroMQ具有一個獨特的非中間件的模式,你不需要安裝和運作一個消息伺服器或中間件,因為你的應用程式将扮演這個伺服器角色。你隻需要簡單的引用ZeroMQ程式庫,可以使用NuGet安裝,然後你就可以愉快的在應用程式之間發送消息了。但是ZeroMQ僅提供非持久性的隊列,也就是說如果當機,資料将會丢失。其中,Twitter的Storm 0.9.0以前的版本中預設使用ZeroMQ作為資料流的傳輸(Storm從0.9版本開始同時支援ZeroMQ和Netty作為傳輸子產品)。

ActiveMQ

  ActiveMQ是Apache下的一個子項目。 類似于ZeroMQ,它能夠以代理人和點對點的技術實作隊列。同時類似于RabbitMQ,它少量代碼就可以高效地實作進階應用場景。

Kafka/Jafka

  Kafka是Apache下的一個子項目,是一個高性能跨語言分布式釋出/訂閱消息隊列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個更新版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行消息持久化;高吞吐,在一台普通的伺服器上既可以達到10W/s的吞吐速率;完全的分布式系統,Broker、Producer、Consumer都原生自動支援分布式,自動實作負載均衡;支援Hadoop資料并行加載,對于像Hadoop的一樣的日志資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的并行加載機制統一了線上和離線的消息處理。Apache Kafka相對于ActiveMQ是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。

      本文轉自Jx戰壕  51CTO部落格,原文連結:http://blog.51cto.com/xujpxm/1934487,如需轉載請自行聯系原作者