天天看點

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

本文我們主要講 Kafka 的安裝和使用,對于 Kafka 的簡介和功能作用就不再詳細介紹,給大家推薦一篇文章,可以自行了解: https://blog.csdn.net/weixin_42238150/article/details/107449787

1. 下載下傳kafka

Kafka 官網:

http://kafka.apache.org/ 下載下傳位址: http://archive.apache.org/dist/kafka/2.3.1/
Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)
因為國外網址,下載下傳速度很慢,建議大家使用鏡像位址下載下傳: https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.1/kafka_2.13-2.5.1.tgz
Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)
鏡像下載下傳連結: https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.1/kafka_2.13-2.5.1.tgz

2. 在 Linux 下安裝Kafka

2.1 使用Xftp 将Kafka 傳輸到雲伺服器/虛拟機指定位置

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

我這裡放到:

/usr/local/src/software

2.2 使用Xshell 連接配接雲伺服器/虛拟機

進入放置 Kafka 壓縮包的檔案夾下,解壓壓縮包:

tar -zxvf kafka_2.13-2.5.1.tgz

然後

cd

指令進入解壓後的檔案夾:

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)
  • bin 目錄中存放一些可執行的指令,比如啟動關閉等
  • config 目錄中存放相關配置檔案
  • libs 目錄存放一些依賴包

進入bin目錄檢視:

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

2.3 在config目錄中修改相關配置

zookeeper.properties

首先我們在/usr/local/src/software/kafka_2.13-2.5.1 下建立一個 data 檔案夾,用于存放zookeeper運作後的資料:mkdir data,建立成功後進入該檔案夾并執行pwd,然後拷貝路徑位址/usr/local/src/software/kafka_2.13-2.5.1/data

然後進入 config 目錄修改 zookeeper.properties 配置檔案:vim zookeeper.properties

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

修改完成後,wq 儲存退出即可!

server.properties

接下來我們修改下 server.properties 配置檔案,同樣在 config 目錄下修改配置檔案 vim server.properties

在 vim 中收入/log.dir 快速定位到要修改的位置:

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)
Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

我們将其修改為:(data 路徑還是剛才複制的路徑)

# A comma separated list of directories under which to store log files
log.dirs=/usr/local/src/software/kafka_2.13-2.5.1/data/kafka-logs
      

然後wq 儲存退出即可!

3. 啟動Kafka

3.1 先啟動 Zookeeper 再啟動Kafka

因為 Kafka 依賴于 Zookeeper 是以先啟動後者!

進入 bin 目錄下執行 :./zookeeper-server-start.sh ../config/zookeeper.properties 指令,以指定配置檔案啟動 zookeeper 服務,守護程序啟動方式,在結尾加一個 & 即可;

zookeeper 服務啟動後,打開一個新的指令視窗,執行./kafka-server-start.sh ../config/server.properties 指令,啟動 Kafka服務,守護程序啟動方式,在結尾加一個 & 即可;

然後我們可以在剛才建立的 data 目錄下看到 Kafka 和 zookeeper 的啟動日志檔案

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

注意:如果使用的是雲伺服器,提前開放安全組端口:

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

上圖2189 和 9092 是我的雲伺服器開放的端口!

如果使用的是虛拟機請不要忘記防火牆放行端口!

3.2 關閉zookeeper 和 kafka

在 bin 目錄下關閉 zookeeper ./zookeeper-server-stop.sh

關閉kafka ./kafka-server-stop.sh

4. 使用Kafka

這裡我們以Kafka消息釋出訂閱(topics)功能為例,來啟動相關的檔案

4.1 建立主題

進入 bin 目錄,執行 ./kafka-topics.sh --create --bootstrap-server xxx.xxxx.xxx:9092 --replication-factor 1 --partitions 1 --topic test 指令,建立一個主題

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

4.2 檢視所有建立的主題清單

執行指令:

./kafka-topics.sh --list --bootstrap-server xxx.xxxx.xxx:9092

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

4.3 生産者(producer)向Kafka某個主題上發送消息

./kafka-console-producer.sh --broker-list xxx.xxxx.xxx:9092 --topic test

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

4.4 消費者(consumer)從Kafka某個主題上接收消息

./kafka-console-consumer.sh --bootstrap-server 8.131.66.136:9092 --topic test --from-beginning

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

我們再從生産者生産一條消息 haha 時候,消費者扔繼續會讀取消息!

參考文章:

https://blog.csdn.net/qq_29308413/article/details/84893539

如果啟動 Kafka 報錯:The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.,參考文章:

https://www.cnblogs.com/qi-yuan-008/p/13912143.html

5. SpringBoot 整合Kafka

建立一個Spring Boot 項目:

5.1 引入依賴

pom.xml 中引入Kafka依賴:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.7.RELEASE</version>
</dependency>
      

5.2 配置Kafka

# spring 相關配置
spring:
  # kafka 配置
  kafka:
    # 伺服器位址及其 kafka 端口
    bootstrap-servers: 8.131.66.136:9092
    # 消費者配置
    consumer:
      # 消費者分組的id,可以在 kafka 的 config 目錄下的 consumer.properties 中配置
      group-id: test-consumer-group
      # 是否自動送出消費者的偏移量
      enable-auto-commit: true
      # 自動送出的頻率 3s
      auto-commit-interval: 3000
      

5.3 測試使用

生産者

/**
 * 生産者
 */
@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;// 注入kafkaTemplate

    /**
     * 發送消息
     *
     * @param topic   消息主題
     * @param content 消息内容
     */
    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }
}
      

消費者

/**
 * 消費者
 */
@Component
class KafkaConsumer {

    /**
     * 消費者訂閱的主題為test
     * 就是通過 @KafkaListener(topics = {"test"}) 注解實作的
     *
     * @param record 接收的消息被封裝成 ConsumerRecord 對象
     */
    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}
      

Test測試

@SpringBootTest
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;// 注入生産者kafkaProducer

    @Test
    public void testKafka() {
        // 生産者發送消息
        kafkaProducer.sendMessage("test", "Hello 你好!");
        kafkaProducer.sendMessage("test", "在嗎?");

        // 在這裡進行一下線程阻塞,模仿消費者消費消息的過程
        try {
            Thread.sleep(1000 * 3);// 10s
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
      

測試結果:

Linux環境下Kafka的安裝與使用(SpringBoot整合雲伺服器上的Kafka)

踩坑:連遠端Kafka報錯UnknownHostException

這個問題參考這篇文章即可解決:

https://blog.csdn.net/chang_harry/article/details/103501369