天天看点

【Kafka笔记】2.Kafka常用命令操作

简介

kafka可以使用java等编程语言实现topic以及消费消息,生产消息的操作,同样命令行也可以实现这些操作。

Kafka的topic的增删改查

对于kafka的topic操作,我们需要用到的是

bin/kafka-topics.sh

这个脚本文件。

[email protected]:/opt/module/kafka_2.11-0.11.0.2# pwd
/opt/module/kafka_2.11-0.11.0.2
           
  1. 查看当前服务器中所有的topic
    bin/kafka-topics.sh --list --zookeeper master:2181 
    __consumer_offsets
               
    只有一个topic,kafka的集群信息依赖于zookeeper,所以我们需要使用

    --zookeeper

    连接zookeeper获取列表
  2. 创建topic主题
    bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test
    Created topic "test".
               
    选项说明:

    --replication-factor 3

    定义副本数,副本数不能超过当前broker数,我这里有三个broker,所以最大可以填写3

    --partitions 2

    定义分区数,分区数无限制数量大小

    --topic test

    定义主题名称
    【Kafka笔记】2.Kafka常用命令操作
  3. 删除topic
    bin/kafka-topics.sh --delete --zookeeper master:2181 --topic test5
               
    删除topic需要注意,我们要确保

    server.properties

    中的

    delete.topic.enable=true

    为true才可以,不然只是把这个topic标记为删除,但是却还是存在的,如果创建同名的topoic,就会提示该topic已存在。
  4. topic的详情查看
    bin/kafka-topics.sh --describe --zookeeper master:2181 --topic test5
               
    描述该主题的详细信息:
    【Kafka笔记】2.Kafka常用命令操作
    输出解释:第一行是所有分区的摘要,其次,每一行提供一个分区信息。
    1. “leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
    2. “replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
    3. “isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。

      这里的0,1,2 都是

      broker.id

      ,我这里是启动了三个kafka线程。
      【Kafka笔记】2.Kafka常用命令操作
      【Kafka笔记】2.Kafka常用命令操作
      图中replicas(副本)就是Leader的节点

Kafka的topic的消息发送和消费

生产消息需要使用

kafka-console-producer.sh

消费消息需要使用

kafka-console-consumer.sh

生产者生产消息

使用

bin/kafka-console-producer.sh --broker-list master:9092 --topic test5

可以向topic test5 发送消息

这里的

--broker-list kafka集群

跟的是kafka集群

这里可以看到当出现

>

时就是你可以输入你的消息了,topic就会接收你的消息

【Kafka笔记】2.Kafka常用命令操作

消费者消费消息

消费消息需要使用

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test5

新版本都是使用

--bootstrap-server

老版本使用的是

--zookeeper

都可以用,建议新版本,

--from-beginnign

表示从topic的的头开始读取消息

【Kafka笔记】2.Kafka常用命令操作

该进程会一直运行,当有新消息进来,这里会直接读取出来消息。

当有Leader节点出现错误时,会在剩余的follower中推举出一个leader,而且这些数据还没有丢失,因为follower是leader的备份节点。

注意:

在老版本中,消费者的offset会存储在zookeeper中,但是在新版本中已经修改为存储在kafka中,下图就是存储在kafka的offset的记录,默认50个分区。

【Kafka笔记】2.Kafka常用命令操作

Kafka Connect 导入和导出

kafka Connect可以从其他来源导入或者导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。

kafka Connect

是导入和导出数据的一个工具,是一个扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。

实例:

  1. 创建一些文本消息
    [email protected]:~# cd /home/
    [email protected]:/home# mkdir file
    [email protected]:/home# cd file/
    [email protected]:/home/file# echo -e "foo\nyes\nhahah\n6666" > text.txt
    [email protected]:/home/file# cat text.txt 
    foo
    yes
    hahah
    6666
    [email protected]:/home/file# 
               
  2. 开启两个连接器运行在独立模式

    独立模式意味着运行一个单一的,本地的,专用的进程。使用的是

    bin/connect-standalone.sh

    这个配置文件

    执行

    bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

    这三个文件作为三个参数,第一个是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需要的任何其他配置。

    下面是我修改的配置文件,还有一点如果topic没有创建,呢么生产消息的时候,kafka会默认给你创建该topic的

    connect-standalone.properties

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    # These are defaults. This file just demonstrates how to override some settings.
    # !!!!这里要修改为你自己的主机名
    bootstrap.servers=master:9092
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
    # need to configure these based on the format they want their data in when loaded from or stored into Kafka
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
    # it to
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    
    # The internal converter used for offsets and config data is configurable and must be specified, but most users will
    # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    offset.storage.file.filename=/tmp/connect.offsets
    # Flush much faster than normal, which is useful for testing/debugging
    offset.flush.interval.ms=10000
    
    # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
    # (connectors, converters, transformations). The list should consist of top level directories that include 
    # any combination of: 
    # a) directories immediately containing jars with plugins and their dependencies
    # b) uber-jars with plugins and their dependencies
    # c) directories immediately containing the package directory structure of classes of plugins and their dependencies
    # Note: symlinks will be followed to discover dependencies or plugins.
    # Examples: 
    # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
    #plugin.path=
               
    connect-file-source.properties
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    name=local-file-source
    connector.class=FileStreamSource
    tasks.max=1
    # 指定读取的文件路径
    file=/home/file/text.txt
    # 指定你的topic
    topic=connect-lzx
               
    connect-file-sink.properties
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    name=local-file-sink
    connector.class=FileStreamSink
    tasks.max=1
    # 指定消息输出文件路径
    file=/home/file/test.sink.txt
    topics=connect-lzx
               

执行运行的shell命令,一旦进程开始,导入连接器读取

text.txt

文件内容写入到

connect-lzx

主题,导出连接器从主题

connect-lzx

读取消息写入到文件

test.sink.txt

,而且可以看到

connect-lzx

的topic已经创建了

【Kafka笔记】2.Kafka常用命令操作

这里也有输出文件了,两个文件是一摸一样的,如果你在text.txt追加内容,输出文件也会从kafka的主题中消费消息。

【Kafka笔记】2.Kafka常用命令操作

也可以使用消费者的shell查看内容:

[email protected]:/opt/module/kafka_2.11-0.11.0.2# bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic connect-lzx --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"yes"}
{"schema":{"type":"string","optional":false},"payload":"hahah"}
{"schema":{"type":"string","optional":false},"payload":"6666"}
           

继续阅读