简介
kafka可以使用java等编程语言实现topic以及消费消息,生产消息的操作,同样命令行也可以实现这些操作。
Kafka的topic的增删改查
对于kafka的topic操作,我们需要用到的是
bin/kafka-topics.sh
这个脚本文件。
[email protected]:/opt/module/kafka_2.11-0.11.0.2# pwd
/opt/module/kafka_2.11-0.11.0.2
- 查看当前服务器中所有的topic
只有一个topic,kafka的集群信息依赖于zookeeper,所以我们需要使用bin/kafka-topics.sh --list --zookeeper master:2181 __consumer_offsets
连接zookeeper获取列表--zookeeper
- 创建topic主题
选项说明:bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".
定义副本数,副本数不能超过当前broker数,我这里有三个broker,所以最大可以填写3--replication-factor 3
定义分区数,分区数无限制数量大小--partitions 2
定义主题名称--topic test
- 删除topic
删除topic需要注意,我们要确保bin/kafka-topics.sh --delete --zookeeper master:2181 --topic test5
中的server.properties
为true才可以,不然只是把这个topic标记为删除,但是却还是存在的,如果创建同名的topoic,就会提示该topic已存在。delete.topic.enable=true
- topic的详情查看
描述该主题的详细信息: 输出解释:第一行是所有分区的摘要,其次,每一行提供一个分区信息。bin/kafka-topics.sh --describe --zookeeper master:2181 --topic test5
- “leader”:该节点负责该分区的所有的读和写,每个节点的leader都是随机选择的。
- “replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
-
“isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。
这里的0,1,2 都是
,我这里是启动了三个kafka线程。 图中replicas(副本)就是Leader的节点broker.id
Kafka的topic的消息发送和消费
生产消息需要使用
kafka-console-producer.sh
消费消息需要使用
kafka-console-consumer.sh
生产者生产消息
使用
bin/kafka-console-producer.sh --broker-list master:9092 --topic test5
可以向topic test5 发送消息
这里的
--broker-list kafka集群
跟的是kafka集群
这里可以看到当出现
>
时就是你可以输入你的消息了,topic就会接收你的消息
消费者消费消息
消费消息需要使用
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --from-beginning --topic test5
新版本都是使用
--bootstrap-server
老版本使用的是
--zookeeper
都可以用,建议新版本,
--from-beginnign
表示从topic的的头开始读取消息
该进程会一直运行,当有新消息进来,这里会直接读取出来消息。
当有Leader节点出现错误时,会在剩余的follower中推举出一个leader,而且这些数据还没有丢失,因为follower是leader的备份节点。
注意:
在老版本中,消费者的offset会存储在zookeeper中,但是在新版本中已经修改为存储在kafka中,下图就是存储在kafka的offset的记录,默认50个分区。
Kafka Connect 导入和导出
kafka Connect可以从其他来源导入或者导出数据到其他系统。对于大多数系统,可以使用kafka Connect,而不需要编写自定义集成代码。
kafka Connect
是导入和导出数据的一个工具,是一个扩展的工具,运行连接器,实现与自定义的逻辑的外部系统交互。
实例:
- 创建一些文本消息
[email protected]:~# cd /home/ [email protected]:/home# mkdir file [email protected]:/home# cd file/ [email protected]:/home/file# echo -e "foo\nyes\nhahah\n6666" > text.txt [email protected]:/home/file# cat text.txt foo yes hahah 6666 [email protected]:/home/file#
-
开启两个连接器运行在独立模式
独立模式意味着运行一个单一的,本地的,专用的进程。使用的是
bin/connect-standalone.sh
这个配置文件
执行
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
这三个文件作为三个参数,第一个是Kafka Connect处理的配置,包含常见的配置,例如要连接的Kafka broker和数据的序列化格式。其余的配置文件都指定了要创建的连接器。包括连接器唯一名称,和要实例化的连接器类。以及连接器所需要的任何其他配置。
下面是我修改的配置文件,还有一点如果topic没有创建,呢么生产消息的时候,kafka会默认给你创建该topic的
connect-standalone.properties
connect-file-source.properties# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # These are defaults. This file just demonstrates how to override some settings. # !!!!这里要修改为你自己的主机名 bootstrap.servers=master:9092 # The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will # need to configure these based on the format they want their data in when loaded from or stored into Kafka key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply # it to key.converter.schemas.enable=true value.converter.schemas.enable=true # The internal converter used for offsets and config data is configurable and must be specified, but most users will # always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins # (connectors, converters, transformations). The list should consist of top level directories that include # any combination of: # a) directories immediately containing jars with plugins and their dependencies # b) uber-jars with plugins and their dependencies # c) directories immediately containing the package directory structure of classes of plugins and their dependencies # Note: symlinks will be followed to discover dependencies or plugins. # Examples: # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, #plugin.path=
connect-file-sink.properties# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name=local-file-source connector.class=FileStreamSource tasks.max=1 # 指定读取的文件路径 file=/home/file/text.txt # 指定你的topic topic=connect-lzx
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. name=local-file-sink connector.class=FileStreamSink tasks.max=1 # 指定消息输出文件路径 file=/home/file/test.sink.txt topics=connect-lzx
执行运行的shell命令,一旦进程开始,导入连接器读取
text.txt
文件内容写入到
connect-lzx
主题,导出连接器从主题
connect-lzx
读取消息写入到文件
test.sink.txt
,而且可以看到
connect-lzx
的topic已经创建了
这里也有输出文件了,两个文件是一摸一样的,如果你在text.txt追加内容,输出文件也会从kafka的主题中消费消息。
也可以使用消费者的shell查看内容:
[email protected]:/opt/module/kafka_2.11-0.11.0.2# bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic connect-lzx --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"yes"}
{"schema":{"type":"string","optional":false},"payload":"hahah"}
{"schema":{"type":"string","optional":false},"payload":"6666"}