點選上方 "程式設計技術圈"關注, 星标或置頂一起成長
背景回複“大禮包”有驚喜禮包!
每日英文
Go for the happy endings, because life doesn't have any sequels.
為了美好的結局而努力奮鬥吧!因為人生沒有續集。
每日掏心話
趁着還年輕,還有時間揮霍,生活就是一種心态,隻有擁有好心态的人才不會讓生活發黴。
責編:樂樂 | 來自:cnblogs.com/along21/p/10278100.html
程式設計技術圈(ID:study_tech)第 1287次推文
往日回顧:人臉識别的時候,一定要穿上衣服啊!
正文
認識 kafka
kafka簡介
Kafka 是一個分布式流媒體平台,kafka官網:http://kafka.apache.org/
1)流媒體平台有三個關鍵功能:
- 釋出和訂閱記錄流,類似于消息隊列或企業消息傳遞系統。
- 以容錯的持久方式存儲記錄流。
- 記錄發生時處理流。
2)Kafka通常用于兩大類應用:
- 建構可在系統或應用程式之間可靠擷取資料的實時流資料管道
- 建構轉換或響應資料流的實時流應用程式
3)首先是幾個概念:
- Kafka作為一個叢集運作在一個或多個可跨多個資料中心的伺服器上。
- Kafka叢集以稱為** topics主題**的類别存儲記錄流。
- 每條記錄都包含一個鍵,一個值和一個時間戳。
4)Kafka有四個核心API:
- Producer API(生産者API)允許應用程式釋出記錄流至一個或多個kafka的topics(主題)。
- Consumer API(消費者API)允許應用程式訂閱一個或多個topics(主題),并處理所産生的對他們記錄的資料流。
- **Streams API(流API)**允許應用程式充當流處理器,從一個或多個topics(主題)消耗的輸入流,并産生一個輸出流至一個或多個輸出的topics(主題),有效地變換所述輸入流,以輸出流。
- Connector API(連接配接器API)允許建構和運作kafka topics(主題)連接配接到現有的應用程式或資料系統中重用生産者或消費者。例如,關系資料庫的連接配接器可能捕獲對表的每個更改。
在Kafka中,用戶端和伺服器之間的通信是通過簡單,高性能,語言無關的TCP協定完成的。此協定已版本化并保持與舊版本的向後相容性。Kafka提供Java用戶端,但用戶端有多種語言版本。
1.2 Topics主題 和 partitions分區
我們首先深入了解 Kafka 為記錄流提供的核心抽象 - 主題topics
一個Topic可以認為是一類消息,每個topic将被分成多個partition(區),每個partition在存儲層面是append log檔案
主題是釋出記錄的類别或訂閱源名稱。Kafka的主題總是多使用者; 也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入它的資料。
對于每個主題,Kafka叢集都維護一個如下所示的分區日志:
每個分區都是一個有序的,不可變的記錄序列,不斷附加到結構化的送出日志中。分區中的記錄每個都配置設定了一個稱為偏移的順序ID号,它唯一地辨別分區中的每個記錄。
Kafka叢集持久儲存所有已釋出的記錄 - 無論是否已使用 - 使用可配置的保留期。例如,如果保留政策設定為兩天,則在釋出記錄後的兩天内,它可供使用,之後将被丢棄以釋放空間。Kafka的性能在資料大小方面實際上是恒定的,是以長時間存儲資料不是問題。
實際上,基于每個消費者保留的唯一進制資料是該消費者在日志中的偏移或位置。這種偏移由消費者控制:通常消費者在讀取記錄時會線性地提高其偏移量,但事實上,由于該位置由消費者控制,是以它可以按照自己喜歡的任何順序消費記錄。例如,消費者可以重置為較舊的偏移量來重新處理過去的資料,或者跳到最近的記錄并從“現在”開始消費。
這些功能組合意味着Kafka 消費者consumers 非常cheap - 他們可以來來往往對叢集或其他消費者沒有太大影響。例如,您可以使用我們的指令行工具“tail”任何主題的内容,而無需更改任何現有使用者所消耗的内容。
日志中的分區有多種用途。首先,它們允許日志擴充到超出适合單個伺服器的大小。每個單獨的分區必須适合托管它的伺服器,但主題可能有許多分區,是以它可以處理任意數量的資料。其次,它們充當了并行性的機關 - 更多的是它。
1.3 Distribution配置設定
一個Topic的多個partitions,被分布在kafka叢集中的多個server上;每個server(kafka執行個體)負責partitions中消息的讀寫操作;此外kafka還可以配置partitions需要備份的個數(replicas),每個partition将會被備份到多台機器上,以提高可用性.
搜尋公衆号Linux中文社群背景回複“私房菜”,擷取一份驚喜禮包。
基于replicated方案,那麼就意味着需要對多個備份進行排程;每個partition都有一個server為"leader";leader負責所有的讀寫操作,如果leader失效,那麼将會有其他follower來接管(成為新的leader);follower隻是單調的和leader跟進,同步消息即可..由此可見作為leader的server承載了全部的請求壓力,是以從叢集的整體考慮,有多少個partitions就意味着有多少個"leader",kafka會将"leader"均衡的分散在每個執行個體上,來確定整體的性能穩定。
1.4 Producers生産者 和 Consumers消費者
1.4.1 Producers生産者
Producers 将資料釋出到指定的topics 主題。同時Producer 也能決定将此消息歸屬于哪個partition;比如基于"round-robin"方式或者通過其他的一些算法等。
1.4.2 Consumers
- 本質上kafka隻支援Topic.每個consumer屬于一個consumer group;反過來說,每個group中可以有多個consumer.發送到Topic的消息,隻會被訂閱此Topic的每個group中的一個consumer消費。
- 如果所有使用者執行個體具有相同的使用者組,則記錄将有效地在使用者執行個體上進行負載平衡。
- 如果所有消費者執行個體具有不同的消費者組,則每個記錄将廣播到所有消費者程序。
分析:兩個伺服器Kafka群集,托管四個分區(P0-P3),包含兩個使用者組。消費者組A有兩個消費者執行個體,B組有四個消費者執行個體。
在Kafka中實作消費consumption 的方式是通過在消費者執行個體上劃分日志中的分區,以便每個執行個體在任何時間點都是配置設定的“公平份額”的獨占消費者。維護組中成員資格的過程由Kafka協定動态處理。如果新執行個體加入該組,他們将從該組的其他成員接管一些分區; 如果執行個體死亡,其分區将分發給其餘執行個體。
Kafka僅提供分區内記錄的總訂單,而不是主題中不同分區之間的記錄。對于大多數應用程式而言,按分區排序與按鍵分區資料的能力相結合就足夠了。但是,如果您需要對記錄進行總訂單,則可以使用僅包含一個分區的主題來實作,但這将意味着每個使用者組隻有一個使用者程序。
1.5 Consumers kafka確定
- 發送到partitions中的消息将會按照它接收的順序追加到日志中。也就是說,如果記錄M1由與記錄M2相同的生成者發送,并且首先發送M1,則M1将具有比M2更低的偏移并且在日志中更早出現。
- 消費者執行個體按照它們存儲在日志中的順序檢視記錄。對于消費者而言,它們消費消息的順序和日志中消息順序一緻。
- 如果Topic的"replicationfactor"為N,那麼允許N-1個kafka執行個體失效,我們将容忍最多N-1個伺服器故障,而不會丢失任何送出到日志的記錄。
1.6 kafka作為消息系統
Kafka的流概念與傳統的企業郵件系統相比如何?
(1)傳統消息系統
消息傳統上有兩種模型:queuing排隊 and publish-subscribe釋出 - 訂閱。在隊列中,消費者池可以從伺服器讀取并且每個記錄轉到其中一個; 在釋出 - 訂閱中,記錄被廣播給所有消費者。這兩種模型中的每一種都有優點和缺點。排隊的優勢在于它允許您在多個消費者執行個體上劃分資料處理,進而可以擴充您的處理。不幸的是,一旦一個程序讀取它已經消失的資料,隊列就不是多使用者。釋出 - 訂閱允許您将資料廣播到多個程序,但由于每條消息都發送給每個訂閱者,是以無法進行擴充處理。
卡夫卡的消費者群體概念概括了這兩個概念。與隊列一樣,使用者組允許您将處理劃分為一組程序(使用者組的成員)。與釋出 - 訂閱一樣,Kafka允許您向多個消費者組廣播消息。
(2)kafka 的優勢
Kafka模型的優勢在于每個主題都具有這些屬性 - 它可以擴充處理并且也是多使用者 - 不需要選擇其中一個。
與傳統的消息系統相比,Kafka具有更強的訂購保證。
傳統隊列在伺服器上按順序保留記錄,如果多個消費者從隊列中消耗,則伺服器按照存儲順序分發記錄。但是,雖然伺服器按順序分發記錄,但是記錄是異步傳遞給消費者的,是以它們可能會在不同的消費者處出現故障。這實際上意味着在存在并行消耗的情況下丢失記錄的順序。消息傳遞系統通常通過具有“獨占消費者”概念來解決這個問題,該概念隻允許一個程序從隊列中消耗,但當然這意味着進行中沒有并行性。
kafka做得更好。通過在主題中具有并行性概念 - 分區 - ,Kafka能夠在消費者流程池中提供訂購保證和負載平衡。這是通過将主題中的分區配置設定給使用者組中的使用者來實作的,以便每個分區僅由該組中的一個使用者使用。通過這樣做,我們確定使用者是該分區的唯一讀者并按順序使用資料。由于有許多分區,這仍然可以平衡許多消費者執行個體的負載。但請注意,消費者組中的消費者執行個體不能超過分區。
1.7 kafka作為存儲系統
- 任何允許釋出與消費消息分離的消息的消息隊列實際上充當了正在進行的消息的存儲系統。Kafka的不同之處在于它是一個非常好的存儲系統。
- 寫入Kafka的資料将寫入磁盤并進行複制以實作容錯。Kafka允許生産者等待确認,以便在完全複制之前寫入不被認為是完整的,并且即使寫入的伺服器失敗也保證寫入仍然存在。
- 磁盤結構Kafka很好地使用了規模 - 無論伺服器上有50 KB還是50 TB的持久資料,Kafka都會執行相同的操作。
- 由于認真對待存儲并允許用戶端控制其讀取位置,您可以将Kafka視為一種專用于高性能,低延遲送出日志存儲,複制和傳播的專用分布式檔案系統。
1.8 kafka用于流處理
- 僅僅讀取,寫入和存儲資料流是不夠的,目的是實作流的實時處理。
- 在Kafka中,流處理器是指從輸入主題擷取連續資料流,對此輸入執行某些處理以及生成連續資料流以輸出主題的任何内容。
- 例如,零售應用程式可能會接收銷售和發貨的輸入流,并輸出重新排序流和根據此資料計算的價格調整。
- 可以使用生産者和消費者API直接進行簡單處理。但是,對于更複雜的轉換,Kafka提供了完全內建的Streams API。這允許建構執行非平凡處理的應用程式,這些應用程式可以計算流的聚合或将流連接配接在一起。
- 此工具有助于解決此類應用程式面臨的難題:處理無序資料,在代碼更改時重新處理輸入,執行有狀态計算等。
- 流API建構在Kafka提供的核心原語上:它使用生産者和消費者API進行輸入,使用Kafka進行有狀态存儲,并在流處理器執行個體之間使用相同的組機制來實作容錯。
2、kafka使用場景
2.1 消息Messaging
Kafka可以替代更傳統的消息代理。消息代理的使用有多種原因(将處理與資料生成器分離,緩沖未處理的消息等)。與大多數消息傳遞系統相比,Kafka具有更好的吞吐量,内置分區,複制和容錯功能,這使其成為大規模消息處理應用程式的理想解決方案。
根據經驗,消息傳遞的使用通常相對較低,但可能需要較低的端到端延遲,并且通常取決于Kafka提供的強大的耐用性保證。
在這個領域,Kafka可與傳統的消息傳遞系統(如ActiveMQ或 RabbitMQ)相媲美。
2.2 網站活動跟蹤
Kafka的原始用例是能夠将使用者活動跟蹤管道重建為一組實時釋出 - 訂閱源。這意味着站點活動(頁面檢視,搜尋或使用者可能采取的其他操作)将釋出到中心主題,每個活動類型包含一個主題。這些源可用于訂購一系列用例,包括實時處理,實時監控以及加載到Hadoop或離線資料倉庫系統以進行脫機處理和報告。
活動跟蹤通常非常高,因為為每個使用者頁面視圖生成了許多活動消息。
2.3 度量Metrics
Kafka通常用于營運監控資料。這涉及從分布式應用程式聚合統計資訊以生成操作資料的集中式提要。
2.4 日志聚合
許多人使用Kafka作為日志聚合解決方案的替代品。日志聚合通常從伺服器收集實體日志檔案,并将它們放在中央位置(可能是檔案伺服器或HDFS)進行處理。Kafka抽象出檔案的細節,并将日志或事件資料作為消息流更清晰地抽象出來。這允許更低延遲的處理并更容易支援多個資料源和分布式資料消耗。與Scribe或Flume等以日志為中心的系統相比,Kafka提供了同樣出色的性能,由于複制而具有更強的耐用性保證,以及更低的端到端延遲。
2.5 流處理
許多Kafka使用者在處理由多個階段組成的管道時處理資料,其中原始輸入資料從Kafka主題中消費,然後聚合,豐富或以其他方式轉換為新主題以供進一步消費或後續處理。
例如,用于推薦新聞文章的處理管道可以從RSS訂閱源抓取文章内容并将其釋出到“文章”主題; 進一步處理可能會對此内容進行規範化或重複資料删除,并将已清理的文章内容釋出到新主題; 最終處理階段可能會嘗試向使用者推薦此内容。此類處理管道基于各個主題建立實時資料流的圖形。從0.10.0.0開始,這是一個輕量級但功能強大的流處理庫,名為Kafka Streams 在Apache Kafka中可用于執行如上所述的此類資料處理。除了Kafka Streams之外,其他開源流處理工具包括Apache Storm和 Apache Samza。
2.6 Event Sourcing
Event Sourcing是一種應用程式設計風格,其中狀态更改記錄為按時間排序的記錄序列。Kafka對非常大的存儲日志資料的支援使其成為以這種風格建構的應用程式的出色後端。
2.7 送出日志
Kafka可以作為分布式系統的一種外部送出日志。該日志有助于在節點之間複制資料,并充當故障節點恢複其資料的重新同步機制。Kafka中的日志壓縮功能有助于支援此用法。在這種用法中,Kafka類似于Apache BookKeeper項目。
3、kafka安裝
3.1 下載下傳安裝
到官網http://kafka.apache.org/downloads.html下載下傳想要的版本。
注:由于Kafka控制台腳本對于基于Unix和Windows的平台是不同的,是以在Windows平台上使用bin\windows\ 而不是bin/ 将腳本擴充名更改為.bat。
[[email protected] ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
[[email protected] ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz
[[email protected] ~]# cd /data/kafka_2.11-2.1.0/
3.2 配置啟動zookeeper
kafka正常運作,必須配置zookeeper,否則無論是kafka叢集還是用戶端的生存者和消費者都無法正常的工作的;是以需要配置啟動zookeeper服務。
(1)zookeeper需要java環境
[[email protected] ~]# yum -y install java-1.8.0
(2)這裡kafka下載下傳包已經包括zookeeper服務,是以隻需修改配置檔案,啟動即可。
如果需要下載下傳指定zookeeper版本;可以單獨去zookeeper官網http://mirrors.shu.edu.cn/apache/zookeeper/下載下傳指定版本。
[[email protected] ~]# cd /data/kafka_2.11-2.1.0/
[[email protected] kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties
dataDir=/tmp/zookeeper #資料存儲目錄
clientPort=2181 #zookeeper端口
maxClientCnxns=0
注:可自行添加修改zookeeper配置
3.3 配置kafka
(1)修改配置檔案
[[email protected] kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
注:可根據自己需求修改配置檔案
broker.id:#唯一辨別ID
listeners=PLAINTEXT://localhost:9092:#kafka服務監聽位址和端口
log.dirs:#日志存儲目錄
zookeeper.connect:#指定zookeeper服務
(2)配置環境變量
[[email protected] ~]# vim /etc/profile.d/kafka.sh
export KAFKA_HOME="/data/kafka_2.11-2.1.0"
export PATH="${KAFKA_HOME}/bin:$PATH"
[[email protected] ~]# source /etc/profile.d/kafka.sh
(3)配置服務啟動腳本
[[email protected] ~]# vim /etc/init.d/kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#
source /etc/rc.d/init.d/functions
KAFKA_HOME=/data/kafka_2.11-2.1.0
KAFKA_USER=root
export LOG_DIR=/tmp/kafka-logs
[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka
# See how we were called.
case "$1" in
start)
echo -n "Starting Kafka:"
/sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
echo " done."
exit 0
;;
stop)
echo -n "Stopping Kafka: "
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"
echo " done."
exit 0
;;
hardstop)
echo -n "Stopping (hard) Kafka: "
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"
echo " done."
exit 0
;;
status)
c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
if [ "$c_pid" = "" ] ; then
echo "Stopped"
exit 3
else
echo "Running $c_pid"
exit 0
fi
;;
restart)
stop
start
;;
*)
echo "Usage: kafka {start|stop|hardstop|status|restart}"
exit 1
;;
esac
3.4 啟動kafka服務
(1)背景啟動zookeeper服務
[[email protected] ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &
(2)啟動kafka服務
[[email protected] ~]# service kafka start
Starting kafka (via systemctl): [ OK ]
[[email protected] ~]# service kafka status
Running 86018
[[email protected] ~]# ss -nutl
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
tcp LISTEN 0 50 :::9092 :::*
tcp LISTEN 0 50 :::2181 :::*
4、kafka使用簡單入門
4.1 建立主題topics
建立一個名為“along”的主題,它隻包含一個分區,隻有一個副本:
搜尋公衆号後端架構師背景回複“架構整潔”,擷取一份驚喜禮包。
[[email protected] ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
Created topic "along".
如果我們運作list topic指令,我們現在可以看到該主題:
[[email protected] ~]# kafka-topics.sh --list --zookeeper localhost:2181
along
4.2 發送一些消息
Kafka附帶一個指令行用戶端,它将從檔案或标準輸入中擷取輸入,并将其作為消息發送到Kafka叢集。預設情況下,每行将作為單獨的消息發送。
運作生産者,然後在控制台中鍵入一些消息以發送到伺服器。
[[email protected] ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
>This is a message
>This is another message
4.3 啟動消費者
Kafka還有一個指令行使用者,它會将消息轉儲到标準輸出。
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning
This is a message
This is another message
所有指令行工具都有其他選項; 運作不帶參數的指令将顯示更詳細地記錄它們的使用資訊。
5、設定多代理kafka群集
到目前為止,我們一直在與一個broker運作,但這并不好玩。對于Kafka,單個代理隻是一個大小為1的叢集,是以除了啟動一些代理執行個體之外沒有太多變化。但是為了感受它,讓我們将我們的叢集擴充到三個節點(仍然在我們的本地機器上)。
5.1 準備配置檔案
[[email protected] kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/
[[email protected] kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties
[[email protected] kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties
[[email protected] kafka_2.11-2.1.0]# vim config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
[[email protected] kafka_2.11-2.1.0]# vim config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
注:該broker.id 屬性是群集中每個節點的唯一且永久的名稱。我們必須覆寫端口和日志目錄,因為我們在同一台機器上運作這些,并且我們希望讓所有代理嘗試在同一端口上注冊或覆寫彼此的資料。
5.2 開啟叢集另2個kafka服務
[[email protected] ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &
[[email protected] ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &
[[email protected] ~]# ss -nutl
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::*
tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::*
tcp LISTEN 0 50 ::ffff:127.0.0.1:9094 :::*
5.3 在叢集中進行操作
(1)現在建立一個複制因子為3的新主題my-replicated-topic
[[email protected] ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
(2)在一個叢集中,運作“describe topics”指令檢視哪個broker正在做什麼
[[email protected] ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
#注釋:第一行給出了所有分區的摘要,每個附加行提供有關一個分區的資訊。由于我們隻有一個分區用于此主題,是以隻有一行。
#“leader”是負責給定分區的所有讀取和寫入的節點。每個節點将成為随機選擇的分區部分的上司者。
#“replicas”是複制此分區日志的節點清單,無論它們是否為上司者,或者即使它們目前處于活動狀态。
# “isr”是“同步”複制品的集合。這是副本清單的子集,該清單目前處于活躍狀态并且已經被上司者捕獲。
#請注意,Leader: 2,在我的示例中,節點2 是該主題的唯一分區的Leader。
(3)可以在我們建立的原始主題上運作相同的指令,以檢視它的位置
[[email protected] ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along
Topic:along PartitionCount:1 ReplicationFactor:1 Configs:
Topic: along Partition: 0 Leader: 0 Replicas: 0 Isr: 0
(4)向我們的新主題釋出一些消息:
[[email protected] ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>my test message 1
>my test message 2
(5)現在讓我們使用這些消息:
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
5.4 測試叢集的容錯性
(1)現在讓我們測試一下容錯性。Broker 2 充當leader 是以讓我們殺了它:
[[email protected] ~]# ps aux | grep server-2.properties |awk '{print $2}'
106737
[[email protected] ~]# kill -9 106737
[[email protected] ~]# ss -nutl
tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::*
tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::*
(2)leader 已切換到其中一個從屬節點,節點2不再位于同步副本集中:
[[email protected] ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
(3)即使最初接受寫入的leader 已經失敗,這些消息仍可供消費:
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
my test message 1
my test message 2
6、使用Kafka Connect導入/導出資料
從控制台寫入資料并将其寫回控制台是一個友善的起點,但有時候可能希望使用其他來源的資料或将資料從Kafka導出到其他系統。對于許多系統,您可以使用Kafka Connect導入或導出資料,而不是編寫自定義內建代碼。
Kafka Connect是Kafka附帶的工具,用于向Kafka導入和導出資料。它是一個可擴充的工具,運作連接配接器,實作與外部系統互動的自定義邏輯。在本快速入門中,我們将了解如何使用簡單的連接配接器運作Kafka Connect,這些連接配接器将資料從檔案導入Kafka主題并将資料從Kafka主題導出到檔案。
(1)首先建立一些種子資料進行測試:
[[email protected] ~]# echo -e "foo\nbar" > test.txt
或者在Windows上:
> echo foo> test.txt
> echo bar>> test.txt
(2)接下來,啟動兩個以獨立模式運作的連接配接器,這意味着它們在單個本地專用程序中運作。提供三個配置檔案作為參數。
第一個始終是Kafka Connect流程的配置,包含常見配置,例如要連接配接的Kafka代理和資料的序列化格式。
其餘配置檔案均指定要建立的連接配接器。這些檔案包括唯一的連接配接器名稱,要執行個體化的連接配接器類以及連接配接器所需的任何其他配置。
[[email protected] ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
[2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
[2019-01-16 16:16:31,903] INFO WorkerInfo values:
... ...
#注:Kafka附帶的這些示例配置檔案使用您之前啟動的預設本地群集配置并建立兩個連接配接器:第一個是源連接配接器,它從輸入檔案讀取行并生成每個Kafka主題,第二個是宿連接配接器從Kafka主題讀取消息并将每個消息生成為輸出檔案中的一行。
(3)驗證是否導入成功(另起終端)
在啟動過程中,您将看到許多日志消息,包括一些訓示正在執行個體化連接配接器的日志消息。
① 一旦Kafka Connect程序啟動,源連接配接器應該開始從test.txt主題讀取行并将其生成到主題connect-test,并且接收器連接配接器應該開始從主題讀取消息connect-test 并将它們寫入檔案test.sink.txt。我們可以通過檢查輸出檔案的内容來驗證資料是否已認證整個管道傳遞:
[[email protected] ~]# cat test.sink.txt
foo
bar
② 請注意,資料存儲在Kafka主題中connect-test,是以我們還可以運作控制台使用者來檢視主題中的資料(或使用自定義使用者代碼來處理它):
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
(4)繼續追加資料,驗證
[[email protected] ~]# echo Another line>> test.txt
[[email protected] ~]# cat test.sink.txt
foo
bar
Another line
[[email protected] ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"
你還有什麼想要補充的嗎?PS:歡迎在留言區留下你的觀點,一起讨論提高。如果今天的文章讓你有新的啟發,歡迎轉發分享給更多人。
版權申明:内容來源網絡,版權歸原創者所有。除非無法确認,我們都會标明作者及出處,如有侵權煩請告知,我們會立即删除并表示歉意。謝謝!
歡迎加入後端架構師交流群,在背景回複“學習”即可。
最近面試BAT,整理一份面試資料《Java面試BAT通關手冊》,覆寫了Java核心技術、JVM、Java并發、SSM、微服務、資料庫、資料結構等等。在這裡,我為大家準備了一份2021年最新最全BAT等大廠Java面試經驗總結。别找了,想擷取史上最簡單的Java大廠面試題學習資料掃下方二維碼回複「面試」就好了猜你還想看阿裡、騰訊、百度、華為、京東最新面試題彙集我終于決定要放棄 okhttp、httpClient,選擇了這個牛逼的神仙工具!賊爽滴滴出行全網下架後,又有三款APP被緊急調查
.apk 成為曆史!
嘿,你在看嗎?