前言
Kafka是一个分布式、多分区、多副本的消息服务。通过消息队列,生产者和消费者异步交互,而不需要彼此等待。相对于传统的消息服务,Kafka有以下特点:
主题可以通过分区(Partition)来实现水平扩展。
分区分布在多个节点上以达到高数据可用性。
通过消费者组(Consumer Group)来支持单个消费者以队列或者Pub/Sub形式的消息消费,或者多个消费者集群顺序消费消息。
相关配置
1、下载
下载kafka和zookeeper
https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz
https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
解压
# tar -zxvf kafka_2.11-1.0.0.tgz
# mv kafka_2.11-1.0.0 kafka
解压zookeeper
2、配置Zookeeper
Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。
a)创建配置文件
$ mv zoo_sample.cfg zoo.cfg
$ vi conf/zoo.cfg
新增以下配置:
tickTime=2000
dataDir=/usr/software/zookeeper/data
clientPort=2181
initLimit=5
syncLimit=2
b)启动ZooKeeper服务器
[[email protected] bin]# ./zkServer.sh start
JMX enabled by default
Using config: /usr/software/zookeeper/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
3、启动KafKa
您可以通过给出以下命令来启动服务器 -
$ ./bin/kafka-server-start.sh config/server.properties &
[2017-11-12 22:30:27,960] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2017-11-12 22:30:27,988] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-11-12 22:30:27,993] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2017-11-12 22:30:27,999] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
启动Kafka Broker后,在终端上键入命令 jps ,看到以下响应 -
[[email protected] bin]# jps
2133 Kafka
1762 QuorumPeerMain
2434 Jps
现在可以看到两个守护进程运行在终端上,QuorumPeerMain是ZooKeeper守护进程,另一个是Kafka守护进程
停止KaFKa服务器
执行所有操作后,可以使用以下命令停止服务器 -
$ ./kafka-server-stop.sh config/server.properties
4、 创建 KafKa主题topic
单节点 - 单代理配置:一个ZooKeeper和代理id实例,以下是配置它的步骤:
a)创建主题 test_kafka
创建Kafka主题 - Kafka提供了一个名为 kafka-topics.sh 的命令行实用程序,用于在服务器上创建主题。
示例
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_kafka
创建名为test_kafka的主题topic
主题列表
要获取Kafka服务器中的主题列表,可以使用以下命令 -
语法
./bin/kafka-topics.sh --list --zookeeper 192.168.1.114:2181
输出
test_kafka
b)启动生产者以发送消息
语法
./bin/kafka-console-producer.sh --broker-list 192.168.1.114:9092 --topic topic-name
从上面的语法,生产者命令行客户端需要两个主要参数 -
代理列表 - 我们要发送邮件的代理列表。 在这种情况下,我们只有一个代理。 Config / server.properties文件包含代理端口ID,因为我们知道我们的代理正在侦听端口9092,因此可以直接指定它。
主题名称 - 以下是主题名称的示例。
示例
[[email protected] kafka]# ./bin/kafka-console-producer.sh --broker-list 192.168.1.114:9092 --topic test_kafka
>123123123123
>12312321;
>eee
>123123123;;;
>
c)启动消费者以接收消息
与生产者类似,在 /config/consumer.properties 文件中指定了缺省使用者属性。
语法
./bin/kafka-console-consumer.sh --zookeeper 192.168.1.114:2181 —topic topic-name --from-beginning
示例
[[email protected] kafka]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_kafka --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
123123123123
12312321;
eee
123123123;;;
在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。