本文我們主要講 Kafka 的安裝和使用,對于 Kafka 的簡介和功能作用就不再詳細介紹,給大家推薦一篇文章,可以自行了解: https://blog.csdn.net/weixin_42238150/article/details/107449787
1. 下載下傳kafka
Kafka 官網:
http://kafka.apache.org/ 下載下傳位址: http://archive.apache.org/dist/kafka/2.3.1/ 因為國外網址,下載下傳速度很慢,建議大家使用鏡像位址下載下傳: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.1/kafka_2.13-2.5.1.tgz 鏡像下載下傳連結: https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.1/kafka_2.13-2.5.1.tgz2. 在 Linux 下安裝Kafka
2.1 使用Xftp 将Kafka 傳輸到雲伺服器/虛拟機指定位置
我這裡放到:
/usr/local/src/software下
2.2 使用Xshell 連接配接雲伺服器/虛拟機
進入放置 Kafka 壓縮包的檔案夾下,解壓壓縮包:
tar -zxvf kafka_2.13-2.5.1.tgz
然後
cd
指令進入解壓後的檔案夾:
- bin 目錄中存放一些可執行的指令,比如啟動關閉等
- config 目錄中存放相關配置檔案
- libs 目錄存放一些依賴包
進入bin目錄檢視:
2.3 在config目錄中修改相關配置
zookeeper.properties
首先我們在/usr/local/src/software/kafka_2.13-2.5.1 下建立一個 data 檔案夾,用于存放zookeeper運作後的資料:mkdir data,建立成功後進入該檔案夾并執行pwd,然後拷貝路徑位址/usr/local/src/software/kafka_2.13-2.5.1/data
然後進入 config 目錄修改 zookeeper.properties 配置檔案:vim zookeeper.properties
修改完成後,wq 儲存退出即可!
server.properties
接下來我們修改下 server.properties 配置檔案,同樣在 config 目錄下修改配置檔案 vim server.properties
在 vim 中收入/log.dir 快速定位到要修改的位置:
我們将其修改為:(data 路徑還是剛才複制的路徑)
# A comma separated list of directories under which to store log files
log.dirs=/usr/local/src/software/kafka_2.13-2.5.1/data/kafka-logs
然後wq 儲存退出即可!
3. 啟動Kafka
3.1 先啟動 Zookeeper 再啟動Kafka
因為 Kafka 依賴于 Zookeeper 是以先啟動後者!
進入 bin 目錄下執行 :./zookeeper-server-start.sh ../config/zookeeper.properties 指令,以指定配置檔案啟動 zookeeper 服務,守護程序啟動方式,在結尾加一個 & 即可;
zookeeper 服務啟動後,打開一個新的指令視窗,執行./kafka-server-start.sh ../config/server.properties 指令,啟動 Kafka服務,守護程序啟動方式,在結尾加一個 & 即可;
然後我們可以在剛才建立的 data 目錄下看到 Kafka 和 zookeeper 的啟動日志檔案
注意:如果使用的是雲伺服器,提前開放安全組端口:
上圖2189 和 9092 是我的雲伺服器開放的端口!
如果使用的是虛拟機請不要忘記防火牆放行端口!
3.2 關閉zookeeper 和 kafka
在 bin 目錄下關閉 zookeeper ./zookeeper-server-stop.sh
關閉kafka ./kafka-server-stop.sh
4. 使用Kafka
這裡我們以Kafka消息釋出訂閱(topics)功能為例,來啟動相關的檔案
4.1 建立主題
進入 bin 目錄,執行 ./kafka-topics.sh --create --bootstrap-server xxx.xxxx.xxx:9092 --replication-factor 1 --partitions 1 --topic test 指令,建立一個主題
4.2 檢視所有建立的主題清單
執行指令:
./kafka-topics.sh --list --bootstrap-server xxx.xxxx.xxx:9092
4.3 生産者(producer)向Kafka某個主題上發送消息
./kafka-console-producer.sh --broker-list xxx.xxxx.xxx:9092 --topic test
4.4 消費者(consumer)從Kafka某個主題上接收消息
./kafka-console-consumer.sh --bootstrap-server 8.131.66.136:9092 --topic test --from-beginning
我們再從生産者生産一條消息 haha 時候,消費者扔繼續會讀取消息!
參考文章:
https://blog.csdn.net/qq_29308413/article/details/84893539如果啟動 Kafka 報錯:The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.,參考文章:
https://www.cnblogs.com/qi-yuan-008/p/13912143.html5. SpringBoot 整合Kafka
建立一個Spring Boot 項目:
5.1 引入依賴
pom.xml 中引入Kafka依賴:
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.7.RELEASE</version>
</dependency>
5.2 配置Kafka
# spring 相關配置
spring:
# kafka 配置
kafka:
# 伺服器位址及其 kafka 端口
bootstrap-servers: 8.131.66.136:9092
# 消費者配置
consumer:
# 消費者分組的id,可以在 kafka 的 config 目錄下的 consumer.properties 中配置
group-id: test-consumer-group
# 是否自動送出消費者的偏移量
enable-auto-commit: true
# 自動送出的頻率 3s
auto-commit-interval: 3000
5.3 測試使用
生産者
/**
* 生産者
*/
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;// 注入kafkaTemplate
/**
* 發送消息
*
* @param topic 消息主題
* @param content 消息内容
*/
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
消費者
/**
* 消費者
*/
@Component
class KafkaConsumer {
/**
* 消費者訂閱的主題為test
* 就是通過 @KafkaListener(topics = {"test"}) 注解實作的
*
* @param record 接收的消息被封裝成 ConsumerRecord 對象
*/
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
Test測試
@SpringBootTest
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;// 注入生産者kafkaProducer
@Test
public void testKafka() {
// 生産者發送消息
kafkaProducer.sendMessage("test", "Hello 你好!");
kafkaProducer.sendMessage("test", "在嗎?");
// 在這裡進行一下線程阻塞,模仿消費者消費消息的過程
try {
Thread.sleep(1000 * 3);// 10s
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試結果:
踩坑:連遠端Kafka報錯UnknownHostException
這個問題參考這篇文章即可解決:
https://blog.csdn.net/chang_harry/article/details/103501369