天天看点

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

Kafka群集中的每个主机都运行一个称为代理的服务器,该服务器存储发送到主题的消息并服务于消费者请求。

首先先看服务器安装kafka的实例信息:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

注意:

然后正常kafka的指令是 :  ./bin/kafka-topics.sh --zookeeper cluster2-4:2181 .......

但是使用CDH安装的kafka则不需要全写出此  ./bin/kafka-topics.sh  部分。只许直接写 kafka-topics 即可,这是很重要的一个区别,使用CDH安装的kafka时候要特别注意一下。

具体有哪些指令可以看此路径下:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

接下来测试topic指令,这里我们要先看CDH中配置的这个ZooKeeper Root的Kafka服务范围为: " /kafka "。

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

所以我们使用topic的指令格式应该都类似:

kafka-topics --zookeeper cluster2-4:2181/kafka ......

A.创建一个名为 test 的主题(Topic):

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

Or

若是上述中的 ZooKeeper Root 的Kafka服务范围为: " / "。则这里的创建主题指令改为:

B.查询现在已经存在的topic:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

 C.删除创建的topic:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

 拓展:

这里如果直接删除,则会输出    Topic *** is marked for deletion   如上图,如果我们topic中消息堆积的太多,或者kafka所在磁盘空间满了等等,则会需要彻底清理一下kafka topic。

方法一:修改kafaka配置文件server.properties, 添加 delete.topic.enable=true,重启kafka,之后通过kafka命令行就可以直接删除topic。

方法二:通过命令行删除topic:  ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}

  因为kafaka配置文件中server.properties没有配置delete.topic.enable=true,此时的删除并不是真正的删除,只是把topic标记为:marked for deletion     你可以通过命令:./bin/kafka-topics --zookeeper {zookeeper server} --list 来查看所有topic

方法三:若需要真正删除它,需要登录zookeeper客户端:

找到topic所在的目录:

执行命令,即可,此时topic被彻底删除:

D.修改topic的分区数:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

 F.我们还可以在这里测试分布式是否连接正常:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

可以看到在2-4这台服务器中,我们后面输入  cluster2-4:2181/kafka  与  cluster2-3:2181/kafka  均可得到统一的信息。

topic指令参数:

三、测试producer产生数据、consumer消费数据

之前我们创建好topic以后,这里测试一下如何使用kafka中的kafka-console-producer与kafka-console-consumer来生产数据、另一端消费数据。

还需先了解这里  发布-订阅系统中的代理结构:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

producer产生数据到Topic中,然后consumer从要消费的Topic中消费数据。

首先启动producer:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

 在这里输入数据,这些数据会上传到zookeeper中的 /kafka/broker/test 主题中。

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

 kafka-console-producer 生产者的指令参数:

接着启动消费者:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

后面的   --from-beginning   表示从指定主题中有效的起始位移位置开始消费所有分区的消息。

消费者消费到topic的数据:

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

 kafka-console-consumer 消费者的指令参数:

 查看消费数据后的偏移量  kafka-run-class

获取topic消费组的偏移量

上一篇: C# 环境
下一篇: Serv-U 提权