Kafka群集中的每个主机都运行一个称为代理的服务器,该服务器存储发送到主题的消息并服务于消费者请求。
首先先看服务器安装kafka的实例信息:
注意:
然后正常kafka的指令是 : ./bin/kafka-topics.sh --zookeeper cluster2-4:2181 .......
但是使用CDH安装的kafka则不需要全写出此 ./bin/kafka-topics.sh 部分。只许直接写 kafka-topics 即可,这是很重要的一个区别,使用CDH安装的kafka时候要特别注意一下。
具体有哪些指令可以看此路径下:
接下来测试topic指令,这里我们要先看CDH中配置的这个ZooKeeper Root的Kafka服务范围为: " /kafka "。
所以我们使用topic的指令格式应该都类似:
kafka-topics --zookeeper cluster2-4:2181/kafka ......
A.创建一个名为 test 的主题(Topic):
Or
若是上述中的 ZooKeeper Root 的Kafka服务范围为: " / "。则这里的创建主题指令改为:
B.查询现在已经存在的topic:
C.删除创建的topic:
拓展:
这里如果直接删除,则会输出 Topic *** is marked for deletion 如上图,如果我们topic中消息堆积的太多,或者kafka所在磁盘空间满了等等,则会需要彻底清理一下kafka topic。
方法一:修改kafaka配置文件server.properties, 添加 delete.topic.enable=true,重启kafka,之后通过kafka命令行就可以直接删除topic。
方法二:通过命令行删除topic: ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}
因为kafaka配置文件中server.properties没有配置delete.topic.enable=true,此时的删除并不是真正的删除,只是把topic标记为:marked for deletion 你可以通过命令:./bin/kafka-topics --zookeeper {zookeeper server} --list 来查看所有topic
方法三:若需要真正删除它,需要登录zookeeper客户端:
找到topic所在的目录:
执行命令,即可,此时topic被彻底删除:
D.修改topic的分区数:
F.我们还可以在这里测试分布式是否连接正常:
可以看到在2-4这台服务器中,我们后面输入 cluster2-4:2181/kafka 与 cluster2-3:2181/kafka 均可得到统一的信息。
topic指令参数:
三、测试producer产生数据、consumer消费数据
之前我们创建好topic以后,这里测试一下如何使用kafka中的kafka-console-producer与kafka-console-consumer来生产数据、另一端消费数据。
还需先了解这里 发布-订阅系统中的代理结构:
producer产生数据到Topic中,然后consumer从要消费的Topic中消费数据。
首先启动producer:
在这里输入数据,这些数据会上传到zookeeper中的 /kafka/broker/test 主题中。
kafka-console-producer 生产者的指令参数:
接着启动消费者:
后面的 --from-beginning 表示从指定主题中有效的起始位移位置开始消费所有分区的消息。
消费者消费到topic的数据:
kafka-console-consumer 消费者的指令参数:
查看消费数据后的偏移量 kafka-run-class
获取topic消费组的偏移量